You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/02 20:48:55 UTC
svn commit: r399011 - in /lucene/hadoop/trunk: CHANGES.txt
src/examples/org/apache/hadoop/examples/ExampleDriver.java
src/examples/org/apache/hadoop/examples/RandomWriter.java
src/examples/org/apache/hadoop/examples/Sort.java
Author: cutting
Date: Tue May 2 11:48:52 2006
New Revision: 399011
URL: http://svn.apache.org/viewcvs?rev=399011&view=rev
Log:
HADOOP-187. Add RandomWriter and Sort examples. Contributed by Owen.
Added:
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399011&r1=399010&r2=399011&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue May 2 11:48:52 2006
@@ -147,6 +147,10 @@
loop. Also improve calculation of time to send next heartbeat.
(omalley via cutting)
+39. HADOOP-187. Add two MapReduce examples/benchmarks. One creates
+ files containing random data. The second sorts the output of the
+ first. (omalley via cutting)
+
Release 0.1.1 - 2006-04-08
1. Added CHANGES.txt, logging all significant changes to Hadoop. (cutting)
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=399011&r1=399010&r2=399011&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Tue May 2 11:48:52 2006
@@ -103,6 +103,10 @@
"A map/reduce program that counts the words in the input files."));
programs.put("grep", new ProgramDescription(Grep.class,
"A map/reduce program that counts the matches of a regex in the input."));
+ programs.put("sort", new ProgramDescription(Sort.class,
+ "Sort binary keys and values."));
+ programs.put("writer", new ProgramDescription(RandomWriter.class,
+ "Write random binary key/value pairs"));
// Make sure they gave us a program name.
if (args.length == 0) {
Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?rev=399011&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Tue May 2 11:48:52 2006
@@ -0,0 +1,210 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This program uses map/reduce to just run a distributed job where there is
+ * no interaction between the tasks and each task write a large unsorted
+ * random binary sequence file of BytesWritable.
+ *
+ * @author Owen O'Malley
+ */
+public class RandomWriter extends MapReduceBase implements Reducer {
+
+ public static class Map extends MapReduceBase implements Mapper {
+ private FileSystem fileSys = null;
+ private long numBytesToWrite;
+ private int minKeySize;
+ private int keySizeRange;
+ private int minValueSize;
+ private int valueSizeRange;
+ private Random random = new Random();
+ private BytesWritable randomKey = new BytesWritable();
+ private BytesWritable randomValue = new BytesWritable();
+
+ private void randomizeBytes(byte[] data, int offset, int length) {
+ for(int i=offset + length - 1; i >= offset; --i) {
+ data[i] = (byte) random.nextInt(256);
+ }
+ }
+
+ /**
+ * Given an output filename, write a bunch of random records to it.
+ */
+ public void map(WritableComparable key,
+ Writable value,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ String filename = ((UTF8) value).toString();
+ SequenceFile.Writer writer =
+ new SequenceFile.Writer(fileSys, new Path(filename),
+ BytesWritable.class, BytesWritable.class);
+ int itemCount = 0;
+ while (numBytesToWrite > 0) {
+ int keyLength = random.nextInt(keySizeRange) + minKeySize;
+ randomKey.setSize(keyLength);
+ randomizeBytes(randomKey.get(), 0, randomKey.getSize());
+ int valueLength = random.nextInt(valueSizeRange) + minValueSize;
+ randomValue.setSize(valueLength);
+ randomizeBytes(randomValue.get(), 0, randomValue.getSize());
+ writer.append(randomKey, randomValue);
+ numBytesToWrite -= keyLength + valueLength;
+ if (++itemCount % 200 == 0) {
+ reporter.setStatus("wrote record " + itemCount + ". " +
+ numBytesToWrite + " bytes left.");
+ }
+ }
+ reporter.setStatus("done with " + itemCount + " records.");
+ writer.close();
+ }
+
+ /**
+ * Save the values out of the configuaration that we need to write
+ * the data.
+ */
+ public void configure(JobConf job) {
+ try {
+ fileSys = FileSystem.get(job);
+ } catch (IOException e) {
+ throw new RuntimeException("Can't get default file system", e);
+ }
+ numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
+ 1*1024*1024*1024);
+ minKeySize = job.getInt("test.randomwrite.min_key", 10);
+ keySizeRange =
+ job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
+ minValueSize = job.getInt("test.randomwrite.min_value", 0);
+ valueSizeRange =
+ job.getInt("test.randomwrite.max_value", 20000) - minValueSize;
+ }
+
+ }
+
+ public void reduce(WritableComparable key,
+ Iterator values,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ // nothing
+ }
+
+ /**
+ * This is the main routine for launching a distributed random write job.
+ * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
+ * The reduce doesn't do anything.
+ *
+ * This program uses a useful pattern for dealing with Hadoop's constraints
+ * on InputSplits. Since each input split can only consist of a file and
+ * byte range and we want to control how many maps there are (and we don't
+ * really have any inputs), we create a directory with a set of artificial
+ * files that each contain the filename that we want a given map to write
+ * to. Then, using the text line reader and this "fake" input directory, we
+ * generate exactly the right number of maps. Each map gets a single record
+ * that is the filename it is supposed to write its output to.
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ Configuration defaults = new Configuration();
+ if (args.length == 0) {
+ System.out.println("Usage: writer <out-dir> [<config>]");
+ return;
+ }
+ Path outDir = new Path(args[0]);
+ if (args.length >= 2) {
+ defaults.addFinalResource(new Path(args[1]));
+ }
+
+ JobConf jobConf = new JobConf(defaults, RandomWriter.class);
+ jobConf.setJobName("random-writer");
+
+ // turn off speculative execution, because DFS doesn't handle
+ // multiple writers to the same file.
+ jobConf.setSpeculativeExecution(false);
+ jobConf.setOutputKeyClass(BytesWritable.class);
+ jobConf.setOutputValueClass(BytesWritable.class);
+
+ jobConf.setMapperClass(Map.class);
+ jobConf.setReducerClass(RandomWriter.class);
+
+ JobClient client = new JobClient(jobConf);
+ ClusterStatus cluster = client.getClusterStatus();
+ int numMaps = cluster.getTaskTrackers() *
+ jobConf.getInt("test.randomwriter.maps_per_host", 10);
+ jobConf.setNumMapTasks(numMaps);
+ System.out.println("Running " + numMaps + " maps.");
+ jobConf.setNumReduceTasks(1);
+
+ Path tmpDir = new Path("random-work");
+ Path inDir = new Path(tmpDir, "in");
+ Path fakeOutDir = new Path(tmpDir, "out");
+ FileSystem fileSys = FileSystem.get(jobConf);
+ fileSys.delete(tmpDir);
+ fileSys.delete(outDir);
+ fileSys.mkdirs(inDir);
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(6);
+ numberFormat.setGroupingUsed(false);
+
+ for(int i=0; i < numMaps; ++i) {
+ Path file = new Path(inDir, "part"+i);
+ FSDataOutputStream writer = fileSys.create(file);
+ writer.writeBytes(outDir + "/part" + numberFormat.format(i)+ "\n");
+ writer.close();
+ }
+ jobConf.setInputPath(inDir);
+ jobConf.setOutputPath(fakeOutDir);
+
+ // Uncomment to run locally in a single process
+ //job_conf.set("mapred.job.tracker", "local");
+
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ try {
+ JobClient.runJob(jobConf);
+ Date endTime = new Date();
+ System.out.println("Job ended: " + endTime);
+ System.out.println("The job took " +
+ (endTime.getTime() - startTime.getTime()) /1000 + " seconds.");
+ } finally {
+ fileSys.delete(tmpDir);
+ }
+ }
+
+}
Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java?rev=399011&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java Tue May 2 11:48:52 2006
@@ -0,0 +1,122 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.fs.*;
+
+/**
+ * This is the trivial map/reduce program that does absolutely nothing
+ * other than use the framework to fragment and sort the input values.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar sort
+ * [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i>
+ *
+ * @author Owen O'Malley
+ */
+public class Sort {
+
+ static void printUsage() {
+ System.out.println("sort [-m <maps>] [-r <reduces>] <input> <output>");
+ System.exit(1);
+ }
+
+ /**
+ * The main driver for sort program.
+ * Invoke this method to submit the map/reduce job.
+ * @throws IOException When there is communication problems with the
+ * job tracker.
+ */
+ public static void main(String[] args) throws IOException {
+ Configuration defaults = new Configuration();
+
+ JobConf jobConf = new JobConf(defaults, Sort.class);
+ jobConf.setJobName("sorter");
+
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ jobConf.setInputKeyClass(BytesWritable.class);
+ jobConf.setInputValueClass(BytesWritable.class);
+ jobConf.setOutputKeyClass(BytesWritable.class);
+ jobConf.setOutputValueClass(BytesWritable.class);
+
+ jobConf.setMapperClass(IdentityMapper.class);
+ jobConf.setReducerClass(IdentityReducer.class);
+
+ JobClient client = new JobClient(jobConf);
+ ClusterStatus cluster = client.getClusterStatus();
+ int num_maps = cluster.getTaskTrackers() *
+ jobConf.getInt("test.sort.maps_per_host", 10);
+ int num_reduces = cluster.getTaskTrackers() *
+ jobConf.getInt("test.sort.reduces_per_host", 10);
+ List otherArgs = new ArrayList();
+ for(int i=0; i < args.length; ++i) {
+ try {
+ if ("-m".equals(args[i])) {
+ num_maps = Integer.parseInt(args[++i]);
+ } else if ("-r".equals(args[i])) {
+ num_reduces = Integer.parseInt(args[++i]);
+ } else {
+ otherArgs.add(args[i]);
+ }
+ } catch (NumberFormatException except) {
+ System.out.println("ERROR: Integer expected instead of " + args[i]);
+ printUsage();
+ } catch (ArrayIndexOutOfBoundsException except) {
+ System.out.println("ERROR: Required parameter missing from " +
+ args[i-1]);
+ printUsage(); // exits
+ }
+ }
+
+ jobConf.setNumMapTasks(num_maps);
+ jobConf.setNumReduceTasks(num_reduces);
+
+ // Make sure there are exactly 2 parameters left.
+ if (otherArgs.size() != 2) {
+ System.out.println("ERROR: Wrong number of parameters: " +
+ otherArgs.size() + " instead of 2.");
+ printUsage();
+ }
+ jobConf.setInputPath(new Path((String) otherArgs.get(0)));
+ jobConf.setOutputPath(new Path((String) otherArgs.get(1)));
+
+ // Uncomment to run locally in a single process
+ //job_conf.set("mapred.job.tracker", "local");
+
+ System.out.println("Running on " +
+ cluster.getTaskTrackers() +
+ " nodes to sort from " +
+ jobConf.getInputPaths()[0] + " into " +
+ jobConf.getOutputPath() + ".");
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ JobClient.runJob(jobConf);
+ Date end_time = new Date();
+ System.out.println("Job ended: " + end_time);
+ System.out.println("The job took " +
+ (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+ }
+
+}