You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/02 20:48:55 UTC

svn commit: r399011 - in /lucene/hadoop/trunk: CHANGES.txt src/examples/org/apache/hadoop/examples/ExampleDriver.java src/examples/org/apache/hadoop/examples/RandomWriter.java src/examples/org/apache/hadoop/examples/Sort.java

Author: cutting
Date: Tue May  2 11:48:52 2006
New Revision: 399011

URL: http://svn.apache.org/viewcvs?rev=399011&view=rev
Log:
HADOOP-187.  Add RandomWriter and Sort examples.  Contributed by Owen.

Added:
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399011&r1=399010&r2=399011&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue May  2 11:48:52 2006
@@ -147,6 +147,10 @@
     loop.  Also improve calculation of time to send next heartbeat.
     (omalley via cutting)
 
+39. HADOOP-187.  Add two MapReduce examples/benchmarks.  One creates
+    files containing random data.  The second sorts the output of the
+    first.  (omalley via cutting)
+
 Release 0.1.1 - 2006-04-08
 
  1. Added CHANGES.txt, logging all significant changes to Hadoop.  (cutting)

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=399011&r1=399010&r2=399011&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Tue May  2 11:48:52 2006
@@ -103,6 +103,10 @@
     "A map/reduce program that counts the words in the input files."));
     programs.put("grep", new ProgramDescription(Grep.class,
     "A map/reduce program that counts the matches of a regex in the input."));
+    programs.put("sort", new ProgramDescription(Sort.class,
+        "Sort binary keys and values."));
+    programs.put("writer", new ProgramDescription(RandomWriter.class,
+        "Write random binary key/value pairs"));
     
     // Make sure they gave us a program name.
     if (args.length == 0) {

Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?rev=399011&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Tue May  2 11:48:52 2006
@@ -0,0 +1,210 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This program uses map/reduce to just run a distributed job where there is
+ * no interaction between the tasks and each task write a large unsorted
+ * random binary sequence file of BytesWritable.
+ * 
+ * @author Owen O'Malley
+ */
+public class RandomWriter extends MapReduceBase implements Reducer {
+  
+  public static class Map extends MapReduceBase implements Mapper {
+    private FileSystem fileSys = null;
+    private long numBytesToWrite;
+    private int minKeySize;
+    private int keySizeRange;
+    private int minValueSize;
+    private int valueSizeRange;
+    private Random random = new Random();
+    private BytesWritable randomKey = new BytesWritable();
+    private BytesWritable randomValue = new BytesWritable();
+    
+    private void randomizeBytes(byte[] data, int offset, int length) {
+      for(int i=offset + length - 1; i >= offset; --i) {
+        data[i] = (byte) random.nextInt(256);
+      }
+    }
+    
+    /**
+     * Given an output filename, write a bunch of random records to it.
+     */
+    public void map(WritableComparable key, 
+                    Writable value,
+                    OutputCollector output, 
+                    Reporter reporter) throws IOException {
+      String filename = ((UTF8) value).toString();
+      SequenceFile.Writer writer = 
+        new SequenceFile.Writer(fileSys, new Path(filename), 
+                                BytesWritable.class, BytesWritable.class);
+      int itemCount = 0;
+      while (numBytesToWrite > 0) {
+        int keyLength = random.nextInt(keySizeRange) + minKeySize;
+        randomKey.setSize(keyLength);
+        randomizeBytes(randomKey.get(), 0, randomKey.getSize());
+        int valueLength = random.nextInt(valueSizeRange) + minValueSize;
+        randomValue.setSize(valueLength);
+        randomizeBytes(randomValue.get(), 0, randomValue.getSize());
+        writer.append(randomKey, randomValue);
+        numBytesToWrite -= keyLength + valueLength;
+        if (++itemCount % 200 == 0) {
+          reporter.setStatus("wrote record " + itemCount + ". " + 
+                             numBytesToWrite + " bytes left.");
+        }
+      }
+      reporter.setStatus("done with " + itemCount + " records.");
+      writer.close();
+     }
+    
+    /**
+     * Save the values out of the configuaration that we need to write
+     * the data.
+     */
+    public void configure(JobConf job) {
+      try {
+        fileSys = FileSystem.get(job);
+      } catch (IOException e) {
+        throw new RuntimeException("Can't get default file system", e);
+      }
+      numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
+                                       1*1024*1024*1024);
+      minKeySize = job.getInt("test.randomwrite.min_key", 10);
+      keySizeRange = 
+        job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
+      minValueSize = job.getInt("test.randomwrite.min_value", 0);
+      valueSizeRange = 
+        job.getInt("test.randomwrite.max_value", 20000) - minValueSize;
+    }
+    
+  }
+  
+  public void reduce(WritableComparable key, 
+                     Iterator values,
+                     OutputCollector output, 
+                     Reporter reporter) throws IOException {
+    // nothing
+  }
+  
+  /**
+   * This is the main routine for launching a distributed random write job.
+   * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
+   * The reduce doesn't do anything.
+   * 
+   * This program uses a useful pattern for dealing with Hadoop's constraints
+   * on InputSplits. Since each input split can only consist of a file and 
+   * byte range and we want to control how many maps there are (and we don't 
+   * really have any inputs), we create a directory with a set of artificial
+   * files that each contain the filename that we want a given map to write 
+   * to. Then, using the text line reader and this "fake" input directory, we
+   * generate exactly the right number of maps. Each map gets a single record
+   * that is the filename it is supposed to write its output to. 
+   * @throws IOException 
+   */
+  public static void main(String[] args) throws IOException {
+    Configuration defaults = new Configuration();
+    if (args.length == 0) {
+      System.out.println("Usage: writer <out-dir> [<config>]");
+      return;
+    }
+    Path outDir = new Path(args[0]);
+    if (args.length >= 2) {
+      defaults.addFinalResource(new Path(args[1]));
+    }
+    
+    JobConf jobConf = new JobConf(defaults, RandomWriter.class);
+    jobConf.setJobName("random-writer");
+    
+    // turn off speculative execution, because DFS doesn't handle
+    // multiple writers to the same file.
+    jobConf.setSpeculativeExecution(false);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    
+    jobConf.setMapperClass(Map.class);        
+    jobConf.setReducerClass(RandomWriter.class);
+    
+    JobClient client = new JobClient(jobConf);
+    ClusterStatus cluster = client.getClusterStatus();
+    int numMaps = cluster.getTaskTrackers() * 
+         jobConf.getInt("test.randomwriter.maps_per_host", 10);
+    jobConf.setNumMapTasks(numMaps);
+    System.out.println("Running " + numMaps + " maps.");
+    jobConf.setNumReduceTasks(1);
+    
+    Path tmpDir = new Path("random-work");
+    Path inDir = new Path(tmpDir, "in");
+    Path fakeOutDir = new Path(tmpDir, "out");
+    FileSystem fileSys = FileSystem.get(jobConf);
+    fileSys.delete(tmpDir);
+    fileSys.delete(outDir);
+    fileSys.mkdirs(inDir);
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setMinimumIntegerDigits(6);
+    numberFormat.setGroupingUsed(false);
+
+    for(int i=0; i < numMaps; ++i) {
+      Path file = new Path(inDir, "part"+i);
+      FSDataOutputStream writer = fileSys.create(file);
+      writer.writeBytes(outDir + "/part" + numberFormat.format(i)+ "\n");
+      writer.close();
+    }
+    jobConf.setInputPath(inDir);
+    jobConf.setOutputPath(fakeOutDir);
+    
+    // Uncomment to run locally in a single process
+    //job_conf.set("mapred.job.tracker", "local");
+    
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    try {
+      JobClient.runJob(jobConf);
+      Date endTime = new Date();
+      System.out.println("Job ended: " + endTime);
+      System.out.println("The job took " + 
+         (endTime.getTime() - startTime.getTime()) /1000 + " seconds.");
+    } finally {
+      fileSys.delete(tmpDir);
+    }
+  }
+  
+}

Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java?rev=399011&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java Tue May  2 11:48:52 2006
@@ -0,0 +1,122 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.fs.*;
+
+/**
+ * This is the trivial map/reduce program that does absolutely nothing
+ * other than use the framework to fragment and sort the input values.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar sort
+ *            [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> 
+ *
+ * @author Owen O'Malley
+ */
+public class Sort {
+  
+  static void printUsage() {
+    System.out.println("sort [-m <maps>] [-r <reduces>] <input> <output>");
+    System.exit(1);
+  }
+  
+  /**
+   * The main driver for sort program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the 
+   *                     job tracker.
+   */
+  public static void main(String[] args) throws IOException {
+    Configuration defaults = new Configuration();
+    
+    JobConf jobConf = new JobConf(defaults, Sort.class);
+    jobConf.setJobName("sorter");
+ 
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+   
+    jobConf.setInputKeyClass(BytesWritable.class);
+    jobConf.setInputValueClass(BytesWritable.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    
+    jobConf.setMapperClass(IdentityMapper.class);        
+    jobConf.setReducerClass(IdentityReducer.class);
+    
+    JobClient client = new JobClient(jobConf);
+    ClusterStatus cluster = client.getClusterStatus();
+    int num_maps = cluster.getTaskTrackers() * 
+         jobConf.getInt("test.sort.maps_per_host", 10);
+    int num_reduces = cluster.getTaskTrackers() * 
+        jobConf.getInt("test.sort.reduces_per_host", 10);
+    List otherArgs = new ArrayList();
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-m".equals(args[i])) {
+          num_maps = Integer.parseInt(args[++i]);
+        } else if ("-r".equals(args[i])) {
+          num_reduces = Integer.parseInt(args[++i]);
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+                           args[i-1]);
+        printUsage(); // exits
+      }
+    }
+    
+    jobConf.setNumMapTasks(num_maps);
+    jobConf.setNumReduceTasks(num_reduces);
+    
+    // Make sure there are exactly 2 parameters left.
+    if (otherArgs.size() != 2) {
+      System.out.println("ERROR: Wrong number of parameters: " +
+          otherArgs.size() + " instead of 2.");
+      printUsage();
+    }
+    jobConf.setInputPath(new Path((String) otherArgs.get(0)));
+    jobConf.setOutputPath(new Path((String) otherArgs.get(1)));
+    
+    // Uncomment to run locally in a single process
+    //job_conf.set("mapred.job.tracker", "local");
+    
+    System.out.println("Running on " +
+        cluster.getTaskTrackers() +
+        " nodes to sort from " + 
+        jobConf.getInputPaths()[0] + " into " +
+        jobConf.getOutputPath() + ".");
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    JobClient.runJob(jobConf);
+    Date end_time = new Date();
+    System.out.println("Job ended: " + end_time);
+    System.out.println("The job took " + 
+       (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+  }
+  
+}