You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/08/26 12:30:33 UTC

svn commit: r807954 - in /hadoop/mapreduce/trunk: ./ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/ src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapreduce/

Author: cdouglas
Date: Wed Aug 26 10:30:32 2009
New Revision: 807954

URL: http://svn.apache.org/viewvc?rev=807954&view=rev
Log:
MAPREDUCE-788. Update gridmix2 to use the new API Contributed by Amareshwari Sriramadasu

Added:
    hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/
    hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java
      - copied, changed from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java
    hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java
      - copied, changed from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java
    hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java
      - copied, changed from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
      - copied, changed from r807951, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
Removed:
    hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml
    hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=807954&r1=807953&r2=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Aug 26 10:30:32 2009
@@ -271,6 +271,9 @@
 
     MAPREDUCE-910. Support counters in MRUnit. (Aaron Kimball via cdouglas)
 
+    MAPREDUCE-788. Update gridmix2 to use the new API (Amareshwari Sriramadasu
+    via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml?rev=807954&r1=807953&r2=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml Wed Aug 26 10:30:32 2009
@@ -39,7 +39,7 @@
 
 <property>
   <name>streamSort.smallJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 
@@ -74,7 +74,7 @@
 </property>
 <property>
   <name>streamSort.mediumJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -131,7 +131,7 @@
 </property>
 <property>
   <name>javaSort.smallJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -160,7 +160,7 @@
 </property>
 <property>
   <name>javaSort.mediumJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -217,7 +217,7 @@
 </property>
 <property>
   <name>combiner.smallJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -246,7 +246,7 @@
 </property>
 <property>
   <name>combiner.mediumJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -303,7 +303,7 @@
 </property>
 <property>
   <name>monsterQuery.smallJobs.inputFiles</name>
-  <value>${FIXCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+  <value>${FIXCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -332,7 +332,7 @@
 </property>
 <property>
   <name>monsterQuery.mediumJobs.inputFiles</name>
-  <value>${FIXCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${FIXCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -390,7 +390,7 @@
 
 <property>
   <name>webdataScan.smallJobs.inputFiles</name>
-  <value>${VARCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -413,7 +413,7 @@
 
 <property>
   <name>webdataScan.mediumJobs.inputFiles</name>
-  <value>${VARCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -469,7 +469,7 @@
 </property>
 <property>
   <name>webdataSort.smallJobs.inputFiles</name>
-  <value>${VARCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -498,7 +498,7 @@
 </property>
 <property>
   <name>webdataSort.mediumJobs.inputFiles</name>
-  <value>${VARCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>

Modified: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2?rev=807954&r1=807953&r2=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2 (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2 Wed Aug 26 10:30:32 2009
@@ -30,7 +30,7 @@
 
 export HADOOP_CLASSPATH=${APP_JAR}:${EXAMPLE_JAR}:${STREAMING_JAR}
 export LIBJARS=${APP_JAR},${EXAMPLE_JAR},${STREAMING_JAR}
-${HADOOP_HOME}/bin/hadoop jar gridmix.jar org.apache.hadoop.mapred.GridMixRunner -libjars ${LIBJARS}
+${HADOOP_HOME}/bin/hadoop jar gridmix.jar org.apache.hadoop.mapreduce.GridMixRunner -libjars ${LIBJARS}
 
 Date=`date +%F-%H-%M-%S-%N`
 echo $Date >  $1_end.out

Copied: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java (from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java?p2=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java&p1=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java&r1=807951&r2=807954&rev=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java Wed Aug 26 10:30:32 2009
@@ -16,75 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.StringTokenizer;
+package org.apache.hadoop.mapreduce;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
 
 public class CombinerJobCreator {
 
-   public static class MapClass extends MapReduceBase
-     implements Mapper<LongWritable, Text, Text, IntWritable> {
-
-     private final static IntWritable one = new IntWritable(1);
-     private Text word = new Text();
-
-     public void map(LongWritable key, Text value,
-                     OutputCollector<Text, IntWritable> output,
-                     Reporter reporter) throws IOException {
-       String line = value.toString();
-       StringTokenizer itr = new StringTokenizer(line);
-       while (itr.hasMoreTokens()) {
-         word.set(itr.nextToken());
-         output.collect(word, one);
-       }
-     }
-   }
-
-   public static class Reduce extends MapReduceBase
-     implements Reducer<Text, IntWritable, Text, IntWritable> {
-
-     public void reduce(Text key, Iterator<IntWritable> values,
-                        OutputCollector<Text, IntWritable> output,
-                        Reporter reporter) throws IOException {
-       int sum = 0;
-       while (values.hasNext()) {
-         sum += values.next().get();
-       }
-       output.collect(key, new IntWritable(sum));
-     }
-   }
-
-  public static JobConf createJob(String[] args) throws Exception {
-    JobConf conf = new JobConf(CombinerJobCreator.class);
-    conf.setJobName("GridmixCombinerJob");
-
-    // the keys are words (strings)
-    conf.setOutputKeyClass(Text.class);
-    // the values are counts (ints)
-    conf.setOutputValueClass(IntWritable.class);
-
-    conf.setMapperClass(MapClass.class);
-    conf.setCombinerClass(Reduce.class);
-    conf.setReducerClass(Reduce.class);
+  public static Job createJob(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    int numReduces = 1;
+    String indir = null;
+    String outdir = null;
     boolean mapoutputCompressed = false;
     boolean outputCompressed = false;
-    // List<String> other_args = new ArrayList<String>();
     for (int i = 0; i < args.length; ++i) {
       try {
         if ("-r".equals(args[i])) {
-          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+          numReduces = Integer.parseInt(args[++i]);
         } else if ("-indir".equals(args[i])) {
-          FileInputFormat.setInputPaths(conf, args[++i]);
+          indir = args[++i];
         } else if ("-outdir".equals(args[i])) {
-          FileOutputFormat.setOutputPath(conf, new Path(args[++i]));
-
+          outdir = args[++i];
         } else if ("-mapoutputCompressed".equals(args[i])) {
           mapoutputCompressed = Boolean.valueOf(args[++i]).booleanValue();
         } else if ("-outputCompressed".equals(args[i])) {
@@ -99,8 +58,28 @@
         return null;
       }
     }
-    conf.setCompressMapOutput(mapoutputCompressed);
+    conf.setBoolean("mapred.compress.map.output", mapoutputCompressed);
     conf.setBoolean("mapred.output.compress", outputCompressed);
-    return conf;
+
+    Job job = new Job(conf);
+    job.setJobName("GridmixCombinerJob");
+
+    // the keys are words (strings)
+    job.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    job.setOutputValueClass(IntWritable.class);
+
+    job.setMapperClass(TokenCounterMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+
+    job.setNumReduceTasks(numReduces);
+    if (indir != null) {
+      FileInputFormat.setInputPaths(job, indir);
+    }
+    if (outdir != null) {
+      FileOutputFormat.setOutputPath(job, new Path(outdir));
+    }
+    return job;
   }
 }

Copied: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java (from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java?p2=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java&p1=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java&r1=807951&r2=807954&rev=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java Wed Aug 26 10:30:32 2009
@@ -16,27 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapreduce;
 
 import java.util.Random;
 import java.util.Stack;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.GenericMRLoadGenerator;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
 
 public class GenericMRLoadJobCreator extends GenericMRLoadGenerator {
 
-  public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
+  public static Job createJob(String[] argv, boolean mapoutputCompressed,
       boolean outputCompressed) throws Exception {
 
-    JobConf job = new JobConf();
+    Job job = new Job();
     job.setJarByClass(GenericMRLoadGenerator.class);
     job.setMapperClass(SampleMapper.class);
     job.setReducerClass(SampleReducer.class);
@@ -46,28 +48,29 @@
 
     if (null == FileOutputFormat.getOutputPath(job)) {
       // No output dir? No writes
-      job.setOutputFormat(NullOutputFormat.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
     }
 
+    Configuration conf = job.getConfiguration();
     if (0 == FileInputFormat.getInputPaths(job).length) {
       // No input dir? Generate random data
       System.err.println("No input path; ignoring InputFormat");
       confRandom(job);
-    } else if (null != job.getClass("mapred.indirect.input.format", null)) {
+    } else if (null != conf.getClass("mapred.indirect.input.format", null)) {
       // specified IndirectInputFormat? Build src list
-      JobClient jClient = new JobClient(job);
+      JobClient jClient = new JobClient(conf);
       Path sysdir = jClient.getSystemDir();
       Random r = new Random();
       Path indirInputFile = new Path(sysdir, Integer.toString(r
           .nextInt(Integer.MAX_VALUE), 36)
           + "_files");
-      job.set("mapred.indirect.input.file", indirInputFile.toString());
+      conf.set("mapred.indirect.input.file", indirInputFile.toString());
       SequenceFile.Writer writer = SequenceFile.createWriter(sysdir
-          .getFileSystem(job), job, indirInputFile, LongWritable.class,
+          .getFileSystem(conf), conf, indirInputFile, LongWritable.class,
           Text.class, SequenceFile.CompressionType.NONE);
       try {
         for (Path p : FileInputFormat.getInputPaths(job)) {
-          FileSystem fs = p.getFileSystem(job);
+          FileSystem fs = p.getFileSystem(conf);
           Stack<Path> pathstack = new Stack<Path>();
           pathstack.push(p);
           while (!pathstack.empty()) {
@@ -89,10 +92,9 @@
       }
     }
 
-    job.setCompressMapOutput(mapoutputCompressed);
-    job.setBoolean("mapred.output.compress", outputCompressed);
+    conf.setBoolean("mapred.compress.map.output", mapoutputCompressed);
+    conf.setBoolean("mapred.output.compress", outputCompressed);
     return job;
-
   }
 
 }

Copied: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java (from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java?p2=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java&p1=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java&r1=807951&r2=807954&rev=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java Wed Aug 26 10:30:32 2009
@@ -16,15 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
@@ -32,11 +32,15 @@
 import org.apache.hadoop.examples.Sort;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.jobcontrol.*;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
 import org.apache.hadoop.streaming.StreamJob;
 
 public class GridMixRunner {
@@ -66,17 +70,17 @@
 
   private enum Size {
     SMALL("small",                               // name
-          "/{part-00000,part-00001,part-00002}", // default input subset
-          NUM_OF_SMALL_JOBS_PER_CLASS,           // defuault num jobs
-          NUM_OF_REDUCERS_FOR_SMALL_JOB),        // default num reducers
+      "/{part-*-00000,part-*-00001,part-*-00002}", // default input subset
+      NUM_OF_SMALL_JOBS_PER_CLASS,           // defuault num jobs
+      NUM_OF_REDUCERS_FOR_SMALL_JOB),        // default num reducers
     MEDIUM("medium",                             // name
-          "/{part-000*0,part-000*1,part-000*2}", // default input subset
-          NUM_OF_MEDIUM_JOBS_PER_CLASS,          // defuault num jobs
-          NUM_OF_REDUCERS_FOR_MEDIUM_JOB),       // default num reducers
+      "/{part-*-000*0, part-*-000*1, part-*-000*2}", // default input subset
+      NUM_OF_MEDIUM_JOBS_PER_CLASS,          // defuault num jobs
+      NUM_OF_REDUCERS_FOR_MEDIUM_JOB),       // default num reducers
     LARGE("large",                               // name
-          "",                                    // default input subset
-          NUM_OF_LARGE_JOBS_PER_CLASS,           // defuault num jobs
-          NUM_OF_REDUCERS_FOR_LARGE_JOB);        // default num reducers
+      "",                                    // default input subset
+      NUM_OF_LARGE_JOBS_PER_CLASS,           // defuault num jobs
+      NUM_OF_REDUCERS_FOR_LARGE_JOB);        // default num reducers
 
     private final String str;
     private final String path;
@@ -107,7 +111,8 @@
     public void addJob(int numReducers, boolean mapoutputCompressed,
         boolean outputCompressed, Size size, JobControl gridmix) {
       final String prop = String.format("streamSort.%sJobs.inputFiles", size);
-      final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
+      final String indir = 
+        getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
       final String outdir = addTSSuffix("perf-out/stream-out-dir-" + size);
 
       StringBuffer sb = new StringBuffer();
@@ -120,12 +125,12 @@
 
       clearDir(outdir);
       try {
-        JobConf jobconf = StreamJob.createJob(args);
-        jobconf.setJobName("GridmixStreamingSorter." + size);
-        jobconf.setCompressMapOutput(mapoutputCompressed);
-        jobconf.setBoolean("mapred.output.compress", outputCompressed);
-        Job job = new Job(jobconf);
-        gridmix.addJob(job);
+        Configuration conf = StreamJob.createJob(args);
+        conf.setBoolean("mapred.output.compress", outputCompressed);
+        conf.setBoolean("mapred.compress.map.output", mapoutputCompressed);
+        Job job = new Job(conf, "GridmixStreamingSorter." + size);
+        ControlledJob cjob = new ControlledJob(job, null);
+        gridmix.addJob(cjob);
       } catch (Exception ex) {
         ex.printStackTrace();
       }
@@ -136,33 +141,34 @@
     public void addJob(int numReducers, boolean mapoutputCompressed,
         boolean outputCompressed, Size size, JobControl gridmix) {
       final String prop = String.format("javaSort.%sJobs.inputFiles", size);
-      final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
+      final String indir = getInputDirsFor(prop,
+                             size.defaultPath(VARINFLTEXT));
       final String outdir = addTSSuffix("perf-out/sort-out-dir-" + size);
 
       clearDir(outdir);
 
       try {
-        JobConf jobConf = new JobConf();
-        jobConf.setJarByClass(Sort.class);
-        jobConf.setJobName("GridmixJavaSorter." + size);
-        jobConf.setMapperClass(IdentityMapper.class);
-        jobConf.setReducerClass(IdentityReducer.class);
-
-        jobConf.setNumReduceTasks(numReducers);
-        jobConf.setInputFormat(org.apache.hadoop.mapred.KeyValueTextInputFormat.class);
-        jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
-
-        jobConf.setOutputKeyClass(org.apache.hadoop.io.Text.class);
-        jobConf.setOutputValueClass(org.apache.hadoop.io.Text.class);
-        jobConf.setCompressMapOutput(mapoutputCompressed);
-        jobConf.setBoolean("mapred.output.compress", outputCompressed);
+        Configuration conf = new Configuration();
+        conf.setBoolean("mapred.output.compress", outputCompressed);
+        conf.setBoolean("mapred.compress.map.output", mapoutputCompressed);
+        Job job = new Job(conf);
+        job.setJarByClass(Sort.class);
+        job.setJobName("GridmixJavaSorter." + size);
+        job.setMapperClass(Mapper.class);
+        job.setReducerClass(Reducer.class);
+
+        job.setNumReduceTasks(numReducers);
+        job.setInputFormatClass(KeyValueTextInputFormat.class);
+        job.setOutputFormatClass(TextOutputFormat.class);
 
-        FileInputFormat.addInputPaths(jobConf, indir);
-        FileOutputFormat.setOutputPath(jobConf, new Path(outdir));
+        job.setOutputKeyClass(org.apache.hadoop.io.Text.class);
+        job.setOutputValueClass(org.apache.hadoop.io.Text.class);
 
-        Job job = new Job(jobConf);
-        gridmix.addJob(job);
+        FileInputFormat.addInputPaths(job, indir);
+        FileOutputFormat.setOutputPath(job, new Path(outdir));
 
+        ControlledJob cjob = new ControlledJob(job, null);
+        gridmix.addJob(cjob);
       } catch (Exception ex) {
         ex.printStackTrace();
       }
@@ -174,12 +180,17 @@
         boolean outputCompressed, Size size, JobControl gridmix) {
       final String prop = String.format("webdataScan.%sJobs.inputFiles", size);
       final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
-      final String outdir = addTSSuffix("perf-out/webdata-scan-out-dir-" + size);
+      final String outdir = addTSSuffix("perf-out/webdata-scan-out-dir-"
+                              + size);
       StringBuffer sb = new StringBuffer();
       sb.append("-keepmap 0.2 ");
       sb.append("-keepred 5 ");
-      sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
-      sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+      sb.append("-inFormat");
+      sb.append(" org.apache.hadoop.mapreduce." +
+        "lib.input.SequenceFileInputFormat ");
+      sb.append("-outFormat");
+      sb.append(" org.apache.hadoop.mapreduce." +
+        "lib.output.SequenceFileOutputFormat ");
       sb.append("-outKey org.apache.hadoop.io.Text ");
       sb.append("-outValue org.apache.hadoop.io.Text ");
       sb.append("-indir ").append(indir).append(" ");
@@ -189,11 +200,11 @@
       String[] args = sb.toString().split(" ");
       clearDir(outdir);
       try {
-        JobConf jobconf = GenericMRLoadJobCreator.createJob(
+        Job job = GenericMRLoadJobCreator.createJob(
             args, mapoutputCompressed, outputCompressed);
-        jobconf.setJobName("GridmixWebdatascan." + size);
-        Job job = new Job(jobconf);
-        gridmix.addJob(job);
+        job.setJobName("GridmixWebdatascan." + size);
+        ControlledJob cjob = new ControlledJob(job, null);
+        gridmix.addJob(cjob);
       } catch (Exception ex) {
         System.out.println(ex.getStackTrace());
       }
@@ -211,16 +222,17 @@
       sb.append("-r ").append(numReducers).append(" ");
       sb.append("-indir ").append(indir).append(" ");
       sb.append("-outdir ").append(outdir);
-      sb.append("-mapoutputCompressed ").append(mapoutputCompressed).append(" ");
+      sb.append("-mapoutputCompressed ");
+      sb.append(mapoutputCompressed).append(" ");
       sb.append("-outputCompressed ").append(outputCompressed);
 
       String[] args = sb.toString().split(" ");
       clearDir(outdir);
       try {
-        JobConf jobconf = CombinerJobCreator.createJob(args);
-        jobconf.setJobName("GridmixCombinerJob." + size);
-        Job job = new Job(jobconf);
-        gridmix.addJob(job);
+        Job job = CombinerJobCreator.createJob(args);
+        job.setJobName("GridmixCombinerJob." + size);
+        ControlledJob cjob = new ControlledJob(job, null);
+        gridmix.addJob(cjob);
       } catch (Exception ex) {
         ex.printStackTrace();
       }
@@ -230,13 +242,14 @@
     MONSTERQUERY("monsterQuery") {
     public void addJob(int numReducers, boolean mapoutputCompressed,
         boolean outputCompressed, Size size, JobControl gridmix) {
-      final String prop = String.format("monsterQuery.%sJobs.inputFiles", size);
+      final String prop = 
+        String.format("monsterQuery.%sJobs.inputFiles", size);
       final String indir = getInputDirsFor(prop, size.defaultPath(FIXCOMPSEQ));
       final String outdir = addTSSuffix("perf-out/mq-out-dir-" + size);
       int iter = 3;
       try {
-        Job pjob = null;
-        Job job = null;
+        ControlledJob pjob = null;
+        ControlledJob cjob = null;
         for (int i = 0; i < iter; i++) {
           String outdirfull = outdir + "." + i;
           String indirfull = (0 == i) ? indir : outdir + "." + (i - 1);
@@ -245,8 +258,12 @@
           StringBuffer sb = new StringBuffer();
           sb.append("-keepmap 10 ");
           sb.append("-keepred 40 ");
-          sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
-          sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+          sb.append("-inFormat");
+          sb.append(" org.apache.hadoop.mapreduce." +
+            "lib.input.SequenceFileInputFormat ");
+          sb.append("-outFormat");
+          sb.append(" org.apache.hadoop.mapreduce." +
+            "lib.output.SequenceFileOutputFormat ");
           sb.append("-outKey org.apache.hadoop.io.Text ");
           sb.append("-outValue org.apache.hadoop.io.Text ");
           sb.append("-indir ").append(indirfull).append(" ");
@@ -260,15 +277,15 @@
             System.out.println(ex.toString());
           }
 
-          JobConf jobconf = GenericMRLoadJobCreator.createJob(
+          Job job = GenericMRLoadJobCreator.createJob(
               args, mapoutputCompressed, outputCompressed);
-          jobconf.setJobName("GridmixMonsterQuery." + size);
-          job = new Job(jobconf);
+          job.setJobName("GridmixMonsterQuery." + size);
+          cjob = new ControlledJob(job, null);
           if (pjob != null) {
-            job.addDependingJob(pjob);
+            cjob.addDependingJob(pjob);
           }
-          gridmix.addJob(job);
-          pjob = job;
+          gridmix.addJob(cjob);
+          pjob = cjob;
         }
       } catch (Exception e) {
         System.out.println(e.getStackTrace());
@@ -281,13 +298,16 @@
         boolean outputCompressed, Size size, JobControl gridmix) {
       final String prop = String.format("webdataSort.%sJobs.inputFiles", size);
       final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
-      final String outdir = addTSSuffix("perf-out/webdata-sort-out-dir-" + size);
+      final String outdir = 
+        addTSSuffix("perf-out/webdata-sort-out-dir-" + size);
 
       StringBuffer sb = new StringBuffer();
       sb.append("-keepmap 100 ");
       sb.append("-keepred 100 ");
-      sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
-      sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+      sb.append("-inFormat org.apache.hadoop.mapreduce." +
+        "lib.input.SequenceFileInputFormat ");
+      sb.append("-outFormat org.apache.hadoop.mapreduce." +
+        "lib.output.SequenceFileOutputFormat ");
       sb.append("-outKey org.apache.hadoop.io.Text ");
       sb.append("-outValue org.apache.hadoop.io.Text ");
       sb.append("-indir ").append(indir).append(" ");
@@ -297,11 +317,11 @@
       String[] args = sb.toString().split(" ");
       clearDir(outdir);
       try {
-        JobConf jobconf = GenericMRLoadJobCreator.createJob(
+        Job job = GenericMRLoadJobCreator.createJob(
             args, mapoutputCompressed, outputCompressed);
-        jobconf.setJobName("GridmixWebdataSort." + size);
-        Job job = new Job(jobconf);
-        gridmix.addJob(job);
+        job.setJobName("GridmixWebdataSort." + size);
+        ControlledJob cjob = new ControlledJob(job, null);
+        gridmix.addJob(cjob);
       } catch (Exception ex) {
         System.out.println(ex.getStackTrace());
       }
@@ -443,7 +463,8 @@
     for (GridMixJob jobtype : EnumSet.allOf(GridMixJob.class)) {
       addAllJobs(jobtype);
     }
-    System.out.println("total " + gridmix.getWaitingJobs().size() + " jobs");
+    System.out.println("total " + 
+      gridmix.getWaitingJobList().size() + " jobs");
   }
 
   class SimpleStats {
@@ -480,35 +501,35 @@
     }
   }
 
-  private TreeMap<String, String> getStatForJob(Job job) {
+  private TreeMap<String, String> getStatForJob(ControlledJob cjob) {
     TreeMap<String, String> retv = new TreeMap<String, String>();
-    String mapreduceID = job.getAssignedJobID().toString();
-    JobClient jc = job.getJobClient();
-    JobConf jobconf = job.getJobConf();
-    String jobName = jobconf.getJobName();
-    retv.put("JobId", mapreduceID);
+    JobID mapreduceID = cjob.getMapredJobID();
+    Job job = cjob.getJob();
+    String jobName = job.getJobName();
+    retv.put("JobId", mapreduceID.toString());
     retv.put("JobName", jobName);
 
     TaskExecutionStats theTaskExecutionStats = new TaskExecutionStats();
 
     try {
-      RunningJob running = jc.getJob(JobID.forName(mapreduceID));
-      Counters jobCounters = running.getCounters();
-      Iterator<Group> groups = jobCounters.iterator();
+      Counters jobCounters = job.getCounters();
+      Iterator<CounterGroup> groups = jobCounters.iterator();
       while (groups.hasNext()) {
-        Group g = groups.next();
+        CounterGroup g = groups.next();
         String gn = g.getName();
-        Iterator<Counters.Counter> cs = g.iterator();
+        Iterator<Counter> cs = g.iterator();
         while (cs.hasNext()) {
-          Counters.Counter c = cs.next();
+          Counter c = cs.next();
           String n = c.getName();
-          long v = c.getCounter();
+          long v = c.getValue();
           retv.put(mapreduceID + "." + jobName + "." + gn + "." + n, "" + v);
         }
       }
-      TaskReport[] maps = jc.getMapTaskReports(JobID.forName(mapreduceID));
+      JobClient jc = new JobClient(job.getConfiguration());
+      TaskReport[] maps = jc
+          .getMapTaskReports((org.apache.hadoop.mapred.JobID)mapreduceID);
       TaskReport[] reduces = jc
-          .getReduceTaskReports(JobID.forName(mapreduceID));
+          .getReduceTaskReports((org.apache.hadoop.mapred.JobID)mapreduceID);
       retv.put(mapreduceID + "." + jobName + "." + "numOfMapTasks", ""
           + maps.length);
       retv.put(mapreduceID + "." + jobName + "." + "numOfReduceTasks", ""
@@ -562,15 +583,15 @@
           + startTime);
       retv.put(mapreduceID + "." + jobName + "." + "reduceEndTime", ""
           + finishTime);
-      if (job.getState() == Job.SUCCESS) {
+      if (cjob.getJobState() == ControlledJob.State.SUCCESS) {
         retv.put(mapreduceID + "." + "jobStatus", "successful");
-      } else if (job.getState() == Job.FAILED) {
+      } else if (cjob.getJobState() == ControlledJob.State.FAILED) {
         retv.put(mapreduceID + "." + jobName + "." + "jobStatus", "failed");
       } else {
         retv.put(mapreduceID + "." + jobName + "." + "jobStatus", "unknown");
       }
-      Iterator<Entry<String, SimpleStats>> entries = theTaskExecutionStats.theStats
-          .entrySet().iterator();
+      Iterator<Entry<String, SimpleStats>> entries = 
+        theTaskExecutionStats.theStats.entrySet().iterator();
       while (entries.hasNext()) {
         Entry<String, SimpleStats> e = entries.next();
         SimpleStats v = e.getValue();
@@ -599,7 +620,7 @@
     }
   }
 
-  private void printStatsForJobs(ArrayList<Job> jobs) {
+  private void printStatsForJobs(List<ControlledJob> jobs) {
     for (int i = 0; i < jobs.size(); i++) {
       printJobStat(getStatForJob(jobs.get(i)));
     }
@@ -612,15 +633,15 @@
     long startTime = System.currentTimeMillis();
     while (!gridmix.allFinished()) {
       System.out.println("Jobs in waiting state: "
-          + gridmix.getWaitingJobs().size());
+          + gridmix.getWaitingJobList().size());
       System.out.println("Jobs in ready state: "
-          + gridmix.getReadyJobs().size());
+          + gridmix.getReadyJobsList().size());
       System.out.println("Jobs in running state: "
-          + gridmix.getRunningJobs().size());
+          + gridmix.getRunningJobList().size());
       System.out.println("Jobs in success state: "
-          + gridmix.getSuccessfulJobs().size());
+          + gridmix.getSuccessfulJobList().size());
       System.out.println("Jobs in failed state: "
-          + gridmix.getFailedJobs().size());
+          + gridmix.getFailedJobList().size());
       System.out.println("\n");
 
       try {
@@ -630,8 +651,8 @@
       }
     }
     long endTime = System.currentTimeMillis();
-    ArrayList<Job> fail = gridmix.getFailedJobs();
-    ArrayList<Job> succeed = gridmix.getSuccessfulJobs();
+    List<ControlledJob> fail = gridmix.getFailedJobList();
+    List<ControlledJob> succeed = gridmix.getSuccessfulJobList();
     int numOfSuccessfulJob = succeed.size();
     if (numOfSuccessfulJob > 0) {
       System.out.println(numOfSuccessfulJob + " jobs succeeded");
@@ -646,7 +667,7 @@
     }
     System.out.println("GridMix results:");
     System.out.println("Total num of Jobs: " + numOfJobs);
-    System.out.println("ExecutionTime: " + ((endTime-startTime)/1000));
+    System.out.println("ExecutionTime: " + ((endTime-startTime) / 1000));
     gridmix.stop();
   }
 

Copied: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java (from r807951, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java?p2=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java&p1=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java&r1=807951&r2=807954&rev=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java Wed Aug 26 10:30:32 2009
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapreduce;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.util.Stack;
 
@@ -38,7 +38,13 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
@@ -61,7 +67,7 @@
   /**
    * Configure a job given argv.
    */
-  public static boolean parseArgs(String[] argv, JobConf job) throws IOException {
+  public static boolean parseArgs(String[] argv, Job job) throws IOException {
     if (argv.length < 1) {
       return 0 == printUsage();
     }
@@ -72,15 +78,13 @@
         return 0 == printUsage();
       }
       try {
-        if ("-m".equals(argv[i])) {
-          job.setNumMapTasks(Integer.parseInt(argv[++i]));
-        } else if ("-r".equals(argv[i])) {
+        if ("-r".equals(argv[i])) {
           job.setNumReduceTasks(Integer.parseInt(argv[++i]));
         } else if ("-inFormat".equals(argv[i])) {
-          job.setInputFormat(
+          job.setInputFormatClass(
               Class.forName(argv[++i]).asSubclass(InputFormat.class));
         } else if ("-outFormat".equals(argv[i])) {
-          job.setOutputFormat(
+          job.setOutputFormatClass(
               Class.forName(argv[++i]).asSubclass(OutputFormat.class));
         } else if ("-outKey".equals(argv[i])) {
           job.setOutputKeyClass(
@@ -89,18 +93,20 @@
           job.setOutputValueClass(
             Class.forName(argv[++i]).asSubclass(Writable.class));
         } else if ("-keepmap".equals(argv[i])) {
-          job.set("hadoop.sort.map.keep.percent", argv[++i]);
+          job.getConfiguration().set("hadoop.sort.map.keep.percent", 
+                argv[++i]);
         } else if ("-keepred".equals(argv[i])) {
-          job.set("hadoop.sort.reduce.keep.percent", argv[++i]);
+          job.getConfiguration().set("hadoop.sort.reduce.keep.percent", 
+                argv[++i]);
         } else if ("-outdir".equals(argv[i])) {
           FileOutputFormat.setOutputPath(job, new Path(argv[++i]));
         } else if ("-indir".equals(argv[i])) {
           FileInputFormat.addInputPaths(job, argv[++i]);
         } else if ("-inFormatIndirect".equals(argv[i])) {
-          job.setClass("mapred.indirect.input.format",
+          job.getConfiguration().setClass("mapred.indirect.input.format",
               Class.forName(argv[++i]).asSubclass(InputFormat.class),
               InputFormat.class);
-          job.setInputFormat(IndirectInputFormat.class);
+          job.setInputFormatClass(IndirectInputFormat.class);
         } else {
           System.out.println("Unexpected argument: " + argv[i]);
           return 0 == printUsage();
@@ -116,7 +122,7 @@
   }
 
   public int run(String [] argv) throws Exception {
-    JobConf job = new JobConf(getConf());
+    Job job = new Job(getConf());
     job.setJarByClass(GenericMRLoadGenerator.class);
     job.setMapperClass(SampleMapper.class);
     job.setReducerClass(SampleReducer.class);
@@ -124,30 +130,31 @@
       return -1;
     }
 
+    Configuration conf = job.getConfiguration();
     if (null == FileOutputFormat.getOutputPath(job)) {
       // No output dir? No writes
-      job.setOutputFormat(NullOutputFormat.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
     }
 
     if (0 == FileInputFormat.getInputPaths(job).length) {
       // No input dir? Generate random data
       System.err.println("No input path; ignoring InputFormat");
       confRandom(job);
-    } else if (null != job.getClass("mapred.indirect.input.format", null)) {
+    } else if (null != conf.getClass("mapred.indirect.input.format", null)) {
       // specified IndirectInputFormat? Build src list
-      JobClient jClient = new JobClient(job);  
+      JobClient jClient = new JobClient(conf);  
       Path sysdir = jClient.getSystemDir();
       Random r = new Random();
       Path indirInputFile = new Path(sysdir,
           Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
-      job.set("mapred.indirect.input.file", indirInputFile.toString());
+      conf.set("mapred.indirect.input.file", indirInputFile.toString());
       SequenceFile.Writer writer = SequenceFile.createWriter(
-          sysdir.getFileSystem(job), job, indirInputFile,
+          sysdir.getFileSystem(conf), conf, indirInputFile,
           LongWritable.class, Text.class,
           SequenceFile.CompressionType.NONE);
       try {
         for (Path p : FileInputFormat.getInputPaths(job)) {
-          FileSystem fs = p.getFileSystem(job);
+          FileSystem fs = p.getFileSystem(conf);
           Stack<Path> pathstack = new Stack<Path>();
           pathstack.push(p);
           while (!pathstack.empty()) {
@@ -171,14 +178,14 @@
 
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    JobClient.runJob(job);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     Date endTime = new Date();
     System.out.println("Job ended: " + endTime);
     System.out.println("The job took " +
                        (endTime.getTime() - startTime.getTime()) /1000 +
                        " seconds.");
 
-    return 0;
+    return ret;
   }
 
   /**
@@ -190,24 +197,27 @@
     System.exit(res);
   }
 
-  static class RandomInputFormat implements InputFormat {
+  static class RandomInputFormat extends InputFormat<Text, Text> {
 
-    public InputSplit[] getSplits(JobConf conf, int numSplits) {
-      InputSplit[] splits = new InputSplit[numSplits];
+    public List<InputSplit> getSplits(JobContext job) {
+      int numSplits = job.getConfiguration().getInt("mapred.map.tasks", 1);
+      List<InputSplit> splits = new ArrayList<InputSplit>();
       for (int i = 0; i < numSplits; ++i) {
-        splits[i] = new IndirectInputFormat.IndirectSplit(
-            new Path("ignore" + i), 1);
+        splits.add(new IndirectInputFormat.IndirectSplit(
+            new Path("ignore" + i), 1));
       }
       return splits;
     }
 
-    public RecordReader<Text,Text> getRecordReader(InputSplit split,
-        JobConf job, Reporter reporter) throws IOException {
+    public RecordReader<Text,Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException {
       final IndirectInputFormat.IndirectSplit clSplit =
         (IndirectInputFormat.IndirectSplit)split;
       return new RecordReader<Text,Text>() {
         boolean once = true;
-        public boolean next(Text key, Text value) {
+        Text key = new Text();
+        Text value = new Text();
+        public boolean nextKeyValue() {
           if (once) {
             key.set(clSplit.getPath().toString());
             once = false;
@@ -215,9 +225,10 @@
           }
           return false;
         }
-        public Text createKey() { return new Text(); }
-        public Text createValue() { return new Text(); }
-        public long getPos() { return 0; }
+        public void initialize(InputSplit split, TaskAttemptContext context) 
+            throws IOException, InterruptedException {}
+        public Text getCurrentKey() { return key; }
+        public Text getCurrentValue() { return value; }
         public void close() { }
         public float getProgress() { return 0.0f; }
       };
@@ -226,8 +237,7 @@
 
   static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
 
-  static class RandomMapOutput extends MapReduceBase
-      implements Mapper<Text,Text,Text,Text> {
+  static class RandomMapOutput extends Mapper<Text,Text,Text,Text> {
     StringBuilder sentence = new StringBuilder();
     int keymin;
     int keymax;
@@ -248,17 +258,18 @@
       return sentence.length();
     }
 
-    public void configure(JobConf job) {
-      bytesToWrite = job.getLong("test.randomtextwrite.bytes_per_map",
+    public void setup(Context context) {
+      Configuration conf = new Configuration();
+      bytesToWrite = conf.getLong("test.randomtextwrite.bytes_per_map",
                                     1*1024*1024*1024);
-      keymin = job.getInt("test.randomtextwrite.min_words_key", 5);
-      keymax = job.getInt("test.randomtextwrite.max_words_key", 10);
-      valmin = job.getInt("test.randomtextwrite.min_words_value", 5);
-      valmax = job.getInt("test.randomtextwrite.max_words_value", 10);
+      keymin = conf.getInt("test.randomtextwrite.min_words_key", 5);
+      keymax = conf.getInt("test.randomtextwrite.max_words_key", 10);
+      valmin = conf.getInt("test.randomtextwrite.min_words_value", 5);
+      valmax = conf.getInt("test.randomtextwrite.max_words_value", 10);
     }
 
-    public void map(Text key, Text val, OutputCollector<Text,Text> output,
-        Reporter reporter) throws IOException {
+    public void map(Text key, Text val, Context context) 
+        throws IOException, InterruptedException {
       long acc = 0L;
       long recs = 0;
       final int keydiff = keymax - keymin;
@@ -269,14 +280,14 @@
             (0 == keydiff ? 0 : r.nextInt(keydiff)));
         recacc += generateSentence(val, valmin +
             (0 == valdiff ? 0 : r.nextInt(valdiff)));
-        output.collect(key, val);
+        context.write(key, val);
         ++recs;
         acc += recacc;
-        reporter.incrCounter(Counters.BYTES_WRITTEN, recacc);
-        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
-        reporter.setStatus(acc + "/" + (bytesToWrite - acc) + " bytes");
+        context.getCounter(Counters.BYTES_WRITTEN).increment(recacc);
+        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
+        context.setStatus(acc + "/" + (bytesToWrite - acc) + " bytes");
       }
-      reporter.setStatus("Wrote " + recs + " records");
+      context.setStatus("Wrote " + recs + " records");
     }
 
   }
@@ -284,84 +295,95 @@
   /**
    * When no input dir is specified, generate random data.
    */
-  protected static void confRandom(JobConf job)
+  protected static void confRandom(Job job)
       throws IOException {
     // from RandomWriter
-    job.setInputFormat(RandomInputFormat.class);
+    job.setInputFormatClass(RandomInputFormat.class);
     job.setMapperClass(RandomMapOutput.class);
 
-    final ClusterStatus cluster = new JobClient(job).getClusterStatus();
-    int numMapsPerHost = job.getInt("test.randomtextwrite.maps_per_host", 10);
+    Configuration conf = job.getConfiguration();
+    final ClusterStatus cluster = new JobClient(conf).getClusterStatus();
+    int numMapsPerHost = conf.getInt("test.randomtextwrite.maps_per_host", 10);
     long numBytesToWritePerMap =
-      job.getLong("test.randomtextwrite.bytes_per_map", 1*1024*1024*1024);
+      conf.getLong("test.randomtextwrite.bytes_per_map", 1*1024*1024*1024);
     if (numBytesToWritePerMap == 0) {
       throw new IOException(
           "Cannot have test.randomtextwrite.bytes_per_map set to 0");
     }
-    long totalBytesToWrite = job.getLong("test.randomtextwrite.total_bytes",
+    long totalBytesToWrite = conf.getLong("test.randomtextwrite.total_bytes",
          numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers());
     int numMaps = (int)(totalBytesToWrite / numBytesToWritePerMap);
     if (numMaps == 0 && totalBytesToWrite > 0) {
       numMaps = 1;
-      job.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
+      conf.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
     }
-    job.setNumMapTasks(numMaps);
+    conf.setInt("mapred.map.tasks", numMaps);
   }
 
 
   // Sampling //
-
-  static abstract class SampleMapReduceBase<K extends WritableComparable,
-                                            V extends Writable>
-      extends MapReduceBase {
+  static abstract class SampleMapBase<K extends WritableComparable<?>,
+      V extends Writable> extends Mapper<K, V, K, V> {
 
     private long total;
     private long kept = 0;
     private float keep;
 
-    protected void setKeep(float keep) {
-      this.keep = keep;
-    }
-
-    protected void emit(K key, V val, OutputCollector<K,V> out)
-        throws IOException {
+    public void setup(Context context) {
+      this.keep = context.getConfiguration().
+        getFloat("hadoop.sort.map.keep.percent", (float)100.0) / (float)100.0;
+    }
+    
+    protected void emit(K key, V val, Context context)
+        throws IOException, InterruptedException {
       ++total;
       while((float) kept / total < keep) {
         ++kept;
-        out.collect(key, val);
+        context.write(key, val);
       }
     }
   }
 
-  public static class SampleMapper<K extends WritableComparable, V extends Writable>
-      extends SampleMapReduceBase<K,V> implements Mapper<K,V,K,V> {
+  static abstract class SampleReduceBase<K extends WritableComparable<?>,
+      V extends Writable> extends Reducer<K, V, K, V> {
+    private long total;
+    private long kept = 0;
+    private float keep;
 
-    public void configure(JobConf job) {
-      setKeep(job.getFloat("hadoop.sort.map.keep.percent", (float)100.0) /
-        (float)100.0);
+    public void setup(Context context) {
+      this.keep = context.getConfiguration().getFloat(
+        "hadoop.sort.reduce.keep.percent", (float)100.0) / (float)100.0;
     }
 
-    public void map(K key, V val,
-                    OutputCollector<K,V> output, Reporter reporter)
-        throws IOException {
-      emit(key, val, output);
+    protected void emit(K key, V val, Context context)
+        throws IOException, InterruptedException {
+      ++total;
+      while((float) kept / total < keep) {
+        ++kept;
+        context.write(key, val);
+      }
     }
-
   }
+  
+  public static class SampleMapper<K extends WritableComparable<?>, 
+                                   V extends Writable>
+      extends SampleMapBase<K,V> {
 
-  public static class SampleReducer<K extends WritableComparable, V extends Writable>
-      extends SampleMapReduceBase<K,V> implements Reducer<K,V,K,V> {
-
-    public void configure(JobConf job) {
-      setKeep(job.getFloat("hadoop.sort.reduce.keep.percent", (float)100.0) /
-        (float)100.0);
+    public void map(K key, V val, Context context)
+        throws IOException, InterruptedException {
+      emit(key, val, context);
     }
 
-    public void reduce(K key, Iterator<V> values,
-                       OutputCollector<K,V> output, Reporter reporter)
-        throws IOException {
-      while (values.hasNext()) {
-        emit(key, values.next(), output);
+  }
+
+  public static class SampleReducer<K extends WritableComparable<?>, 
+                                    V extends Writable>
+      extends SampleReduceBase<K,V> {
+
+    public void reduce(K key, Iterable<V> values, Context context)
+        throws IOException, InterruptedException {
+      for (V value : values) {
+        emit(key, value, context);
       }
     }
 
@@ -373,9 +395,9 @@
    * Obscures the InputFormat and location information to simulate maps
    * reading input from arbitrary locations (&quot;indirect&quot; reads).
    */
-  static class IndirectInputFormat implements InputFormat {
+  static class IndirectInputFormat<K, V> extends InputFormat<K, V> {
 
-    static class IndirectSplit implements InputSplit {
+    static class IndirectSplit extends InputSplit {
       Path file;
       long len;
       public IndirectSplit() { }
@@ -398,32 +420,34 @@
       }
     }
 
-    public InputSplit[] getSplits(JobConf job, int numSplits)
+    public List<InputSplit> getSplits(JobContext job)
         throws IOException {
 
-      Path src = new Path(job.get("mapred.indirect.input.file", null));
-      FileSystem fs = src.getFileSystem(job);
+      Configuration conf = job.getConfiguration();
+      Path src = new Path(conf.get("mapred.indirect.input.file", null));
+      FileSystem fs = src.getFileSystem(conf);
 
-      ArrayList<IndirectSplit> splits = new ArrayList<IndirectSplit>(numSplits);
+      List<InputSplit> splits = new ArrayList<InputSplit>();
       LongWritable key = new LongWritable();
       Text value = new Text();
-      for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, job);
+      for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, conf);
            sl.next(key, value);) {
         splits.add(new IndirectSplit(new Path(value.toString()), key.get()));
       }
 
-      return splits.toArray(new IndirectSplit[splits.size()]);
+      return splits;
     }
 
-    public RecordReader getRecordReader(InputSplit split, JobConf job,
-        Reporter reporter) throws IOException {
-      InputFormat indirIF = (InputFormat)ReflectionUtils.newInstance(
-          job.getClass("mapred.indirect.input.format",
-            SequenceFileInputFormat.class), job);
+	@SuppressWarnings("unchecked")
+	public RecordReader<K, V> createRecordReader(InputSplit split, 
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      InputFormat<K, V> indirIF = (InputFormat)ReflectionUtils.newInstance(
+          conf.getClass("mapred.indirect.input.format",
+            SequenceFileInputFormat.class), conf);
       IndirectSplit is = ((IndirectSplit)split);
-      return indirIF.getRecordReader(new FileSplit(is.getPath(), 0,
-            is.getLength(), (String[])null),
-          job, reporter);
+      return indirIF.createRecordReader(new FileSplit(is.getPath(), 0,
+            is.getLength(), (String[])null), context);
     }
   }