You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/28 13:27:38 UTC

svn commit: r769339 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/io/ src/examples/org/apache/hadoop/examples/ src/test/org/apache/hadoop/mapred/

Author: sharad
Date: Tue Apr 28 11:27:37 2009
New Revision: 769339

URL: http://svn.apache.org/viewvc?rev=769339&view=rev
Log:
HADOOP-5699. Change PiEstimator to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr 28 11:27:37 2009
@@ -34,6 +34,12 @@
     HADOOP-5681. Change examples RandomWriter and RandomTextWriter to 
     use new mapreduce API. (Amareshwari Sriramadasu via sharad)
 
+    HADOOP-5680. Change org.apache.hadoop.examples.SleepJob to use new 
+    mapreduce api. (Amareshwari Sriramadasu via sharad)
+
+    HADOOP-5699. Change org.apache.hadoop.examples.PiEstimator to use 
+    new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   NEW FEATURES
 
     HADOOP-4268. Change fsck to use ClientProtocol methods so that the
@@ -256,9 +262,6 @@
     HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by
     TupleWritable encoding. (Jingkei Ly via cdouglas)
 
-    HADOOP-5680. Change org.apache.hadoop.examples.SleepJob to use new 
-    mapreduce api. (Amareshwari Sriramadasu via sharad)
-
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java Tue Apr 28 11:27:37 2009
@@ -100,9 +100,7 @@
 
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
-      boolean a = (readInt(b1, s1) == 1) ? true : false;
-      boolean b = (readInt(b2, s2) == 1) ? true : false;
-      return ((a == b) ? 0 : (a == false) ? -1 : 1);
+      return compareBytes(b1, s1, l1, b2, s2, l2);
     }
   }
 

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java Tue Apr 28 11:27:37 2009
@@ -20,8 +20,8 @@
 
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,17 +31,11 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -137,19 +131,18 @@
    * Generate points in a unit square
    * and then count points inside/outside of the inscribed circle of the square.
    */
-  public static class PiMapper extends MapReduceBase
-    implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {
+  public static class PiMapper extends 
+      Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {
 
     /** Map method.
      * @param offset samples starting from the (offset+1)th sample.
      * @param size the number of samples for this map
-     * @param out output {ture->numInside, false->numOutside}
-     * @param reporter
+     * @param context output {ture->numInside, false->numOutside}
      */
     public void map(LongWritable offset,
                     LongWritable size,
-                    OutputCollector<BooleanWritable, LongWritable> out,
-                    Reporter reporter) throws IOException {
+                    Context context) 
+        throws IOException, InterruptedException {
 
       final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
       long numInside = 0L;
@@ -171,13 +164,13 @@
         //report status
         i++;
         if (i % 1000 == 0) {
-          reporter.setStatus("Generated " + i + " samples.");
+          context.setStatus("Generated " + i + " samples.");
         }
       }
 
       //output map results
-      out.collect(new BooleanWritable(true), new LongWritable(numInside));
-      out.collect(new BooleanWritable(false), new LongWritable(numOutside));
+      context.write(new BooleanWritable(true), new LongWritable(numInside));
+      context.write(new BooleanWritable(false), new LongWritable(numOutside));
     }
   }
 
@@ -185,34 +178,29 @@
    * Reducer class for Pi estimation.
    * Accumulate points inside/outside results from the mappers.
    */
-  public static class PiReducer extends MapReduceBase
-    implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
+  public static class PiReducer extends 
+      Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
     
     private long numInside = 0;
     private long numOutside = 0;
-    private JobConf conf; //configuration for accessing the file system
       
-    /** Store job configuration. */
-    @Override
-    public void configure(JobConf job) {
-      conf = job;
-    }
-
     /**
      * Accumulate number of points inside/outside results from the mappers.
      * @param isInside Is the points inside? 
      * @param values An iterator to a list of point counts
-     * @param output dummy, not used here.
-     * @param reporter
+     * @param context dummy, not used here.
      */
     public void reduce(BooleanWritable isInside,
-                       Iterator<LongWritable> values,
-                       OutputCollector<WritableComparable<?>, Writable> output,
-                       Reporter reporter) throws IOException {
+        Iterable<LongWritable> values, Context context)
+        throws IOException, InterruptedException {
       if (isInside.get()) {
-        for(; values.hasNext(); numInside += values.next().get());
+        for (LongWritable val : values) {
+          numInside += val.get();
+        }
       } else {
-        for(; values.hasNext(); numOutside += values.next().get());
+        for (LongWritable val : values) {
+          numOutside += val.get();
+        }
       }
     }
 
@@ -220,10 +208,11 @@
      * Reduce task done, write output to a file.
      */
     @Override
-    public void close() throws IOException {
+    public void cleanup(Context context) throws IOException {
       //write output to a file
       Path outDir = new Path(TMP_DIR, "out");
       Path outFile = new Path(outDir, "reduce-out");
+      Configuration conf = context.getConfiguration();
       FileSystem fileSys = FileSystem.get(conf);
       SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
           outFile, LongWritable.class, LongWritable.class, 
@@ -238,34 +227,35 @@
    *
    * @return the estimated value of Pi
    */
-  public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
-      ) throws IOException {
+  public static BigDecimal estimate(int numMaps, long numPoints, Configuration conf
+      ) throws IOException, ClassNotFoundException, InterruptedException {
+    Job job = new Job(conf);
     //setup job conf
-    jobConf.setJobName(PiEstimator.class.getSimpleName());
+    job.setJobName(PiEstimator.class.getSimpleName());
+    job.setJarByClass(PiEstimator.class);
 
-    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
 
-    jobConf.setOutputKeyClass(BooleanWritable.class);
-    jobConf.setOutputValueClass(LongWritable.class);
-    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(BooleanWritable.class);
+    job.setOutputValueClass(LongWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
 
-    jobConf.setMapperClass(PiMapper.class);
-    jobConf.setNumMapTasks(numMaps);
+    job.setMapperClass(PiMapper.class);
 
-    jobConf.setReducerClass(PiReducer.class);
-    jobConf.setNumReduceTasks(1);
+    job.setReducerClass(PiReducer.class);
+    job.setNumReduceTasks(1);
 
     // turn off speculative execution, because DFS doesn't handle
     // multiple writers to the same file.
-    jobConf.setSpeculativeExecution(false);
+    job.setSpeculativeExecution(false);
 
     //setup input/output directories
     final Path inDir = new Path(TMP_DIR, "in");
     final Path outDir = new Path(TMP_DIR, "out");
-    FileInputFormat.setInputPaths(jobConf, inDir);
-    FileOutputFormat.setOutputPath(jobConf, outDir);
+    FileInputFormat.setInputPaths(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
 
-    final FileSystem fs = FileSystem.get(jobConf);
+    final FileSystem fs = FileSystem.get(conf);
     if (fs.exists(TMP_DIR)) {
       throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
           + " already exists.  Please remove it first.");
@@ -281,7 +271,7 @@
         final LongWritable offset = new LongWritable(i * numPoints);
         final LongWritable size = new LongWritable(numPoints);
         final SequenceFile.Writer writer = SequenceFile.createWriter(
-            fs, jobConf, file,
+            fs, conf, file,
             LongWritable.class, LongWritable.class, CompressionType.NONE);
         try {
           writer.append(offset, size);
@@ -294,7 +284,7 @@
       //start a map/reduce job
       System.out.println("Starting Job");
       final long startTime = System.currentTimeMillis();
-      JobClient.runJob(jobConf);
+      job.waitForCompletion(true);
       final double duration = (System.currentTimeMillis() - startTime)/1000.0;
       System.out.println("Job Finished in " + duration + " seconds");
 
@@ -302,7 +292,7 @@
       Path inFile = new Path(outDir, "reduce-out");
       LongWritable numInside = new LongWritable();
       LongWritable numOutside = new LongWritable();
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf);
       try {
         reader.next(numInside, numOutside);
       } finally {
@@ -329,7 +319,7 @@
     if (args.length != 2) {
       System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
       ToolRunner.printGenericCommandUsage(System.err);
-      return -1;
+      return 2;
     }
     
     final int nMaps = Integer.parseInt(args[0]);
@@ -338,9 +328,8 @@
     System.out.println("Number of Maps  = " + nMaps);
     System.out.println("Samples per Map = " + nSamples);
         
-    final JobConf jobConf = new JobConf(getConf(), getClass());
     System.out.println("Estimated value of Pi is "
-        + estimate(nMaps, nSamples, jobConf));
+        + estimate(nMaps, nSamples, getConf()));
     return 0;
   }
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java Tue Apr 28 11:27:37 2009
@@ -19,7 +19,6 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.BooleanWritable.Comparator;
 import junit.framework.TestCase;
 import java.io.*;
 import java.util.*;
@@ -237,7 +236,8 @@
       return -super.compare(b1, s1, l1, b2, s2, l2);
     }
     static {                    // register this comparator
-      WritableComparator.define(DecreasingIntComparator.class, new Comparator());
+      WritableComparator.define(DecreasingIntComparator.class,
+                                new IntWritable.Comparator());
     }
   }
 
@@ -268,7 +268,8 @@
     }
     
     static {
-      WritableComparator.define(CompositeIntGroupFn.class, new Comparator());
+      WritableComparator.define(CompositeIntGroupFn.class, 
+                                new IntWritable.Comparator());
     }
   }
 
@@ -284,7 +285,8 @@
     }
     
     static {
-      WritableComparator.define(CompositeIntReverseGroupFn.class, new Comparator());
+      WritableComparator.define(CompositeIntReverseGroupFn.class, 
+                                new IntWritable.Comparator());
     }
   }
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Apr 28 11:27:37 2009
@@ -45,7 +45,8 @@
     new File(System.getProperty("test.build.data","/tmp"))
     .toURI().toString().replace(' ', '+');
     
-  public void testWithLocal() throws IOException {
+  public void testWithLocal() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(2, "file:///", 3);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Apr 28 11:27:37 2009
@@ -169,7 +169,8 @@
     }
   }
 
-  public static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+  public static void runPI(MiniMRCluster mr, JobConf jobconf) 
+      throws IOException, InterruptedException, ClassNotFoundException {
     LOG.info("runPI");
     double estimate = org.apache.hadoop.examples.PiEstimator.estimate(
         NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
@@ -232,7 +233,8 @@
     }
   }
 
-  public void testWithDFS() throws IOException {
+  public void testWithDFS() 
+      throws IOException, InterruptedException, ClassNotFoundException {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     FileSystem fileSys = null;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Tue Apr 28 11:27:37 2009
@@ -55,7 +55,8 @@
     }
   }
   
-  static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+  static void runPI(MiniMRCluster mr, JobConf jobconf) 
+      throws IOException, InterruptedException, ClassNotFoundException  {
     LOG.info("runPI");
     double estimate = PiEstimator.estimate(NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
     double error = Math.abs(Math.PI - estimate);
@@ -66,7 +67,8 @@
    * Run the pi test with a specifix value of 
    * mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded.
    */
-  private boolean runOneTest(int maxTasks) throws IOException {
+  private boolean runOneTest(int maxTasks) 
+      throws IOException, InterruptedException, ClassNotFoundException {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     FileSystem fileSys = null;
@@ -96,7 +98,8 @@
     return success;
   }
 
-  public void testTaskLimits() throws IOException {
+  public void testTaskLimits() 
+      throws IOException, InterruptedException, ClassNotFoundException {
 
     System.out.println("Job 1 running with max set to 2");
     boolean status = runOneTest(2);