You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/20 14:27:08 UTC

svn commit: r766670 - in /hadoop/core/trunk: ./ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: sharad
Date: Mon Apr 20 12:27:08 2009
New Revision: 766670

URL: http://svn.apache.org/viewvc?rev=766670&view=rev
Log:
HADOOP-5681. Change examples RandomWriter and RandomTextWriter to use new mapreduce API. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Apr 20 12:27:08 2009
@@ -240,6 +240,9 @@
     HADOOP-4490. Provide ability to run tasks as job owners.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    HADOOP-5681. Change examples RandomWriter and RandomTextWriter to 
+    use new mapreduce API. (Amareshwari Sriramadasu via sharad)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java Mon Apr 20 12:27:08 2009
@@ -29,15 +29,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -86,7 +81,7 @@
                        "[-outFormat <output format class>] " + 
                        "<output>");
     ToolRunner.printGenericCommandUsage(System.out);
-    return -1;
+    return 2;
   }
   
   /**
@@ -94,8 +89,7 @@
    */
   static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
 
-  static class Map extends MapReduceBase 
-    implements Mapper<Text, Text, Text, Text> {
+  static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
     
     private long numBytesToWrite;
     private int minWordsInKey;
@@ -107,18 +101,19 @@
     /**
      * Save the configuration value that we need to write the data.
      */
-    public void configure(JobConf job) {
-      numBytesToWrite = job.getLong("test.randomtextwrite.bytes_per_map",
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      numBytesToWrite = conf.getLong("test.randomtextwrite.bytes_per_map",
                                     1*1024*1024*1024);
       minWordsInKey = 
-        job.getInt("test.randomtextwrite.min_words_key", 5);
+        conf.getInt("test.randomtextwrite.min_words_key", 5);
       wordsInKeyRange = 
-        (job.getInt("test.randomtextwrite.max_words_key", 10) - 
+        (conf.getInt("test.randomtextwrite.max_words_key", 10) - 
          minWordsInKey);
       minWordsInValue = 
-        job.getInt("test.randomtextwrite.min_words_value", 10);
+        conf.getInt("test.randomtextwrite.min_words_value", 10);
       wordsInValueRange = 
-        (job.getInt("test.randomtextwrite.max_words_value", 100) - 
+        (conf.getInt("test.randomtextwrite.max_words_value", 100) - 
          minWordsInValue);
     }
     
@@ -126,8 +121,7 @@
      * Given an output filename, write a bunch of random records to it.
      */
     public void map(Text key, Text value,
-                    OutputCollector<Text, Text> output, 
-                    Reporter reporter) throws IOException {
+                    Context context) throws IOException,InterruptedException {
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         // Generate the key/value 
@@ -139,20 +133,20 @@
         Text valueWords = generateSentence(noWordsValue);
         
         // Write the sentence 
-        output.collect(keyWords, valueWords);
+        context.write(keyWords, valueWords);
         
         numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
         
         // Update counters, progress etc.
-        reporter.incrCounter(Counters.BYTES_WRITTEN, 
-                             (keyWords.getLength()+valueWords.getLength()));
-        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
+        context.getCounter(Counters.BYTES_WRITTEN).increment(
+                  keyWords.getLength() + valueWords.getLength());
+        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
         if (++itemCount % 200 == 0) {
-          reporter.setStatus("wrote record " + itemCount + ". " + 
+          context.setStatus("wrote record " + itemCount + ". " + 
                              numBytesToWrite + " bytes left.");
         }
       }
-      reporter.setStatus("done with " + itemCount + " records.");
+      context.setStatus("done with " + itemCount + " records.");
     }
     
     private Text generateSentence(int noWords) {
@@ -178,33 +172,35 @@
       return printUsage();    
     }
     
-    JobConf job = new JobConf(getConf());
-    
-    job.setJarByClass(RandomTextWriter.class);
-    job.setJobName("random-text-writer");
-    
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(Text.class);
-    
-    job.setInputFormat(RandomWriter.RandomInputFormat.class);
-    job.setMapperClass(Map.class);        
-    
-    JobClient client = new JobClient(job);
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
-    int numMapsPerHost = job.getInt("test.randomtextwrite.maps_per_host", 10);
-    long numBytesToWritePerMap = job.getLong("test.randomtextwrite.bytes_per_map",
+    int numMapsPerHost = conf.getInt("test.randomtextwrite.maps_per_host", 10);
+    long numBytesToWritePerMap = conf.getLong("test.randomtextwrite.bytes_per_map",
                                              1*1024*1024*1024);
     if (numBytesToWritePerMap == 0) {
       System.err.println("Cannot have test.randomtextwrite.bytes_per_map set to 0");
       return -2;
     }
-    long totalBytesToWrite = job.getLong("test.randomtextwrite.total_bytes", 
+    long totalBytesToWrite = conf.getLong("test.randomtextwrite.total_bytes", 
          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
     if (numMaps == 0 && totalBytesToWrite > 0) {
       numMaps = 1;
-      job.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
+      conf.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
     }
+    conf.setInt("mapred.map.tasks", numMaps);
+    
+    Job job = new Job(conf);
+    
+    job.setJarByClass(RandomTextWriter.class);
+    job.setJobName("random-text-writer");
+    
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    
+    job.setInputFormatClass(RandomWriter.RandomInputFormat.class);
+    job.setMapperClass(RandomTextMapper.class);        
     
     Class<? extends OutputFormat> outputFormatClass = 
       SequenceFileOutputFormat.class;
@@ -224,10 +220,9 @@
       }
     }
 
-    job.setOutputFormat(outputFormatClass);
+    job.setOutputFormatClass(outputFormatClass);
     FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));
     
-    job.setNumMapTasks(numMaps);
     System.out.println("Running " + numMaps + " maps.");
     
     // reducer NONE
@@ -235,14 +230,14 @@
     
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    JobClient.runJob(job);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     Date endTime = new Date();
     System.out.println("Job ended: " + endTime);
     System.out.println("The job took " + 
                        (endTime.getTime() - startTime.getTime()) /1000 + 
                        " seconds.");
     
-    return 0;
+    return ret;
   }
   
   public static void main(String[] args) throws Exception {

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Mon Apr 20 12:27:08 2009
@@ -19,7 +19,9 @@
 package org.apache.hadoop.examples;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -30,19 +32,11 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -93,19 +87,20 @@
    * A custom input format that creates virtual inputs of a single string
    * for each map.
    */
-  static class RandomInputFormat implements InputFormat<Text, Text> {
+  static class RandomInputFormat extends InputFormat<Text, Text> {
 
     /** 
      * Generate the requested number of file splits, with the filename
      * set to the filename of the output file.
      */
-    public InputSplit[] getSplits(JobConf job, 
-                                  int numSplits) throws IOException {
-      InputSplit[] result = new InputSplit[numSplits];
+    public List<InputSplit> getSplits(JobContext job) throws IOException {
+      List<InputSplit> result = new ArrayList<InputSplit>();
       Path outDir = FileOutputFormat.getOutputPath(job);
-      for(int i=0; i < result.length; ++i) {
-        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
-                                  (String[])null);
+      int numSplits = 
+            job.getConfiguration().getInt("mapred.map.tasks", 1);
+      for(int i=0; i < numSplits; ++i) {
+        result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
+                                  (String[])null));
       }
       return result;
     }
@@ -114,43 +109,52 @@
      * Return a single record (filename, "") where the filename is taken from
      * the file split.
      */
-    static class RandomRecordReader implements RecordReader<Text, Text> {
+    static class RandomRecordReader extends RecordReader<Text, Text> {
       Path name;
+      Text key = null;
+      Text value = new Text();
       public RandomRecordReader(Path p) {
         name = p;
       }
-      public boolean next(Text key, Text value) {
+      
+      public void initialize(InputSplit split,
+                             TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    	  
+      }
+      
+      public boolean nextKeyValue() {
         if (name != null) {
+          key = new Text();
           key.set(name.getName());
           name = null;
           return true;
         }
         return false;
       }
-      public Text createKey() {
-        return new Text();
-      }
-      public Text createValue() {
-        return new Text();
+      
+      public Text getCurrentKey() {
+        return key;
       }
-      public long getPos() {
-        return 0;
+      
+      public Text getCurrentValue() {
+        return value;
       }
+      
       public void close() {}
+
       public float getProgress() {
         return 0.0f;
       }
     }
 
-    public RecordReader<Text, Text> getRecordReader(InputSplit split,
-                                        JobConf job, 
-                                        Reporter reporter) throws IOException {
+    public RecordReader<Text, Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException, InterruptedException {
       return new RandomRecordReader(((FileSplit) split).getPath());
     }
   }
 
-  static class Map extends MapReduceBase
-    implements Mapper<WritableComparable, Writable,
+  static class RandomMapper extends Mapper<WritableComparable, Writable,
                       BytesWritable, BytesWritable> {
     
     private long numBytesToWrite;
@@ -173,8 +177,7 @@
      */
     public void map(WritableComparable key, 
                     Writable value,
-                    OutputCollector<BytesWritable, BytesWritable> output, 
-                    Reporter reporter) throws IOException {
+                    Context context) throws IOException,InterruptedException {
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         int keyLength = minKeySize + 
@@ -185,16 +188,16 @@
           (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
         randomValue.setSize(valueLength);
         randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
-        output.collect(randomKey, randomValue);
+        context.write(randomKey, randomValue);
         numBytesToWrite -= keyLength + valueLength;
-        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
-        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
+        context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
+        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
         if (++itemCount % 200 == 0) {
-          reporter.setStatus("wrote record " + itemCount + ". " + 
+          context.setStatus("wrote record " + itemCount + ". " + 
                              numBytesToWrite + " bytes left.");
         }
       }
-      reporter.setStatus("done with " + itemCount + " records.");
+      context.setStatus("done with " + itemCount + " records.");
     }
     
     /**
@@ -202,17 +205,17 @@
      * the data.
      */
     @Override
-    public void configure(JobConf job) {
-      numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      numBytesToWrite = conf.getLong("test.randomwrite.bytes_per_map",
                                     1*1024*1024*1024);
-      minKeySize = job.getInt("test.randomwrite.min_key", 10);
+      minKeySize = conf.getInt("test.randomwrite.min_key", 10);
       keySizeRange = 
-        job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
-      minValueSize = job.getInt("test.randomwrite.min_value", 0);
+        conf.getInt("test.randomwrite.max_key", 1000) - minKeySize;
+      minValueSize = conf.getInt("test.randomwrite.min_value", 0);
       valueSizeRange = 
-        job.getInt("test.randomwrite.max_value", 20000) - minValueSize;
+        conf.getInt("test.randomwrite.max_value", 20000) - minValueSize;
     }
-    
   }
   
   /**
@@ -226,42 +229,41 @@
     if (args.length == 0) {
       System.out.println("Usage: writer <out-dir>");
       ToolRunner.printGenericCommandUsage(System.out);
-      return -1;
+      return 2;
     }
     
     Path outDir = new Path(args[0]);
-    JobConf job = new JobConf(getConf());
-    
-    job.setJarByClass(RandomWriter.class);
-    job.setJobName("random-writer");
-    FileOutputFormat.setOutputPath(job, outDir);
-    
-    job.setOutputKeyClass(BytesWritable.class);
-    job.setOutputValueClass(BytesWritable.class);
-    
-    job.setInputFormat(RandomInputFormat.class);
-    job.setMapperClass(Map.class);        
-    job.setReducerClass(IdentityReducer.class);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
-    
-    JobClient client = new JobClient(job);
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
-    int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
-    long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
+    int numMapsPerHost = conf.getInt("test.randomwriter.maps_per_host", 10);
+    long numBytesToWritePerMap = conf.getLong("test.randomwrite.bytes_per_map",
                                              1*1024*1024*1024);
     if (numBytesToWritePerMap == 0) {
       System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
       return -2;
     }
-    long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 
+    long totalBytesToWrite = conf.getLong("test.randomwrite.total_bytes", 
          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
     if (numMaps == 0 && totalBytesToWrite > 0) {
       numMaps = 1;
-      job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
+      conf.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
     }
+    conf.setInt("mapred.map.tasks", numMaps);
+
+    Job job = new Job(conf);
+    
+    job.setJarByClass(RandomWriter.class);
+    job.setJobName("random-writer");
+    FileOutputFormat.setOutputPath(job, outDir);
+    job.setOutputKeyClass(BytesWritable.class);
+    job.setOutputValueClass(BytesWritable.class);
+    job.setInputFormatClass(RandomInputFormat.class);
+    job.setMapperClass(RandomMapper.class);        
+    job.setReducerClass(Reducer.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
     
-    job.setNumMapTasks(numMaps);
     System.out.println("Running " + numMaps + " maps.");
     
     // reducer NONE
@@ -269,14 +271,14 @@
     
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    JobClient.runJob(job);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     Date endTime = new Date();
     System.out.println("Job ended: " + endTime);
     System.out.println("The job took " + 
                        (endTime.getTime() - startTime.getTime()) /1000 + 
                        " seconds.");
     
-    return 0;
+    return ret;
   }
   
   public static void main(String[] args) throws Exception {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Mon Apr 20 12:27:08 2009
@@ -411,6 +411,18 @@
   }
 
   /**
+   * Build a job client with the given {@link Configuration}, 
+   * and connect to the default {@link JobTracker}.
+   * 
+   * @param conf the configuration.
+   * @throws IOException
+   */
+  public JobClient(Configuration conf) throws IOException {
+    setConf(conf);
+    init(new JobConf(conf));
+  }
+
+  /**
    * Connect to the default {@link JobTracker}.
    * @param conf the job configuration.
    * @throws IOException

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java Mon Apr 20 12:27:08 2009
@@ -491,7 +491,7 @@
   // Input formats
   /**
    * A custom input format that creates virtual inputs of a single string
-   * for each map. Using {@link RandomWriter} code. 
+   * for each map. 
    */
   public static class RandomInputFormat implements InputFormat<Text, Text> {