You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/10/03 00:16:16 UTC
svn commit: r581417 - in /lucene/hadoop/trunk: CHANGES.txt
src/examples/org/apache/hadoop/examples/ExampleDriver.java
src/examples/org/apache/hadoop/examples/Sort.java
src/test/org/apache/hadoop/mapred/SortValidator.java
Author: omalley
Date: Tue Oct 2 15:16:15 2007
New Revision: 581417
URL: http://svn.apache.org/viewvc?rev=581417&view=rev
Log:
HADOOP-1926. Add a random text writer so that we can benchmark the
performance of map/reduce using the compression codecs.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581417&r1=581416&r2=581417&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 2 15:16:15 2007
@@ -336,6 +336,9 @@
HADOOP-120. In ArrayWritable, prevent creation with null value
class, and improve documentation. (Cameron Pope via cutting)
+ HADOOP-1926. Add a random text writer example/benchmark so that we can
+ benchmark compression codecs on random data.
+
Release 0.14.2 - unreleased
BUG FIXES
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=581417&r1=581416&r2=581417&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Tue Oct 2 15:16:15 2007
@@ -38,6 +38,8 @@
"A map/reduce program that counts the matches of a regex in the input.");
pgd.addClass("randomwriter", RandomWriter.class,
"A map/reduce program that writes 10GB of random data per node.");
+ pgd.addClass("randomtextwriter", RandomTextWriter.class,
+ "A map/reduce program that writes 10GB of random textual data per node.");
pgd.addClass("sort", Sort.class, "A map/reduce program that sorts the data written by the random writer.");
pgd.addClass("pi", PiEstimator.class, "A map/reduce program that estimates Pi using monte-carlo method.");
pgd.addClass("pentomino", DistributedPentomino.class,
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java?rev=581417&r1=581416&r2=581417&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java Tue Oct 2 15:16:15 2007
@@ -25,6 +25,8 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -36,12 +38,22 @@
* other than use the framework to fragment and sort the input values.
*
* To run: bin/hadoop jar build/hadoop-examples.jar sort
- * [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i>
+ * [-m <i>maps</i>] [-r <i>reduces</i>]
+ * [-inFormat <i>input format class</i>]
+ * [-outFormat <i>output format class</i>]
+ * [-outKey <i>output key class</i>]
+ * [-outValue <i>output value class</i>]
+ * <i>in-dir</i> <i>out-dir</i>
*/
public class Sort extends Configured implements Tool {
static int printUsage() {
- System.out.println("sort [-m <maps>] [-r <reduces>] <input> <output>");
+ System.out.println("sort [-m <maps>] [-r <reduces>] " +
+ "[-inFormat <input format class>] " +
+ "[-outFormat <output format class>] " +
+ "[-outKey <output key class>] " +
+ "[-outValue <output value class>] " +
+ "<input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
@@ -57,12 +69,6 @@
JobConf jobConf = new JobConf(getConf(), Sort.class);
jobConf.setJobName("sorter");
- jobConf.setInputFormat(SequenceFileInputFormat.class);
- jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-
- jobConf.setOutputKeyClass(BytesWritable.class);
- jobConf.setOutputValueClass(BytesWritable.class);
-
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
@@ -72,6 +78,12 @@
jobConf.getInt("test.sort.maps_per_host", 10);
int num_reduces = cluster.getTaskTrackers() *
jobConf.getInt("test.sort.reduces_per_host", cluster.getMaxTasks());
+ Class<? extends InputFormat> inputFormatClass =
+ SequenceFileInputFormat.class;
+ Class<? extends OutputFormat> outputFormatClass =
+ SequenceFileOutputFormat.class;
+ Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
+ Class<? extends Writable> outputValueClass = BytesWritable.class;
List<String> otherArgs = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
@@ -79,6 +91,18 @@
num_maps = Integer.parseInt(args[++i]);
} else if ("-r".equals(args[i])) {
num_reduces = Integer.parseInt(args[++i]);
+ } else if ("-inFormat".equals(args[i])) {
+ inputFormatClass =
+ Class.forName(args[++i]).asSubclass(InputFormat.class);
+ } else if ("-outFormat".equals(args[i])) {
+ outputFormatClass =
+ Class.forName(args[++i]).asSubclass(OutputFormat.class);
+ } else if ("-outKey".equals(args[i])) {
+ outputKeyClass =
+ Class.forName(args[++i]).asSubclass(WritableComparable.class);
+ } else if ("-outValue".equals(args[i])) {
+ outputValueClass =
+ Class.forName(args[++i]).asSubclass(Writable.class);
} else {
otherArgs.add(args[i]);
}
@@ -92,8 +116,15 @@
}
}
+ // Set user-supplied (possibly default) job configs
jobConf.setNumMapTasks(num_maps);
jobConf.setNumReduceTasks(num_reduces);
+
+ jobConf.setInputFormat(inputFormatClass);
+ jobConf.setOutputFormat(outputFormatClass);
+
+ jobConf.setOutputKeyClass(outputKeyClass);
+ jobConf.setOutputValueClass(outputValueClass);
// Make sure there are exactly 2 parameters left.
if (otherArgs.size() != 2) {
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java?rev=581417&r1=581416&r2=581417&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java Tue Oct 2 15:16:15 2007
@@ -31,7 +31,6 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.RecordStatsWritable;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.fs.*;
@@ -131,12 +130,13 @@
}
public static class Map extends MapReduceBase
- implements Mapper<BytesWritable, BytesWritable,
+ implements Mapper<WritableComparable, Writable,
IntWritable, RecordStatsWritable> {
private IntWritable key = null;
- private BytesWritable prevKey = null;
- private Partitioner<BytesWritable, BytesWritable> partitioner = null;
+ private WritableComparable prevKey = null;
+ private Class<? extends WritableComparable> keyClass;
+ private Partitioner<WritableComparable, Writable> partitioner = null;
private int partition = -1;
private int noSortReducers = -1;
private long recordId = -1;
@@ -146,7 +146,7 @@
key = deduceInputFile(job);
if (key == sortOutput) {
- partitioner = new HashPartitioner<BytesWritable, BytesWritable>();
+ partitioner = new HashPartitioner<WritableComparable, Writable>();
// Figure the 'current' partition and no. of reduces of the 'sort'
try {
@@ -163,31 +163,40 @@
}
}
- public void map(BytesWritable key,
- BytesWritable value,
+ @SuppressWarnings("unchecked")
+ public void map(WritableComparable key, Writable value,
OutputCollector<IntWritable, RecordStatsWritable> output,
Reporter reporter) throws IOException {
- BytesWritable bwKey = key;
- BytesWritable bwValue = value;
++recordId;
if (this.key == sortOutput) {
// Check if keys are 'sorted' if this
// record is from sort's output
if (prevKey == null) {
- prevKey = bwKey;
+ prevKey = key;
+ keyClass = prevKey.getClass();
+ System.err.println("Got key #1 class: " + keyClass);
} else {
- if (prevKey.compareTo(bwKey) > 0) {
- throw new IOException("The 'map-reduce' framework wrongly classifed"
- + "(" + prevKey + ") > (" + bwKey + ") for record# "
- + recordId);
+ System.err.println("Got key class: " + key.getClass());
+ // Sanity check
+ if (keyClass != key.getClass()) {
+ throw new IOException("Type mismatch in key: expected " +
+ keyClass.getName() + ", recieved " +
+ key.getClass().getName());
}
- prevKey = bwKey;
+
+ // Check if they were sorted correctly
+ if (prevKey.compareTo(key) > 0) {
+ throw new IOException("The 'map-reduce' framework wrongly" +
+ " classifed (" + prevKey + ") > (" +
+ key + ") "+ "for record# " + recordId);
+ }
+ prevKey = key;
}
// Check if the sorted output is 'partitioned' right
int keyPartition =
- partitioner.getPartition(bwKey, bwValue, noSortReducers);
+ partitioner.getPartition(key, value, noSortReducers);
if (partition != keyPartition) {
throw new IOException("Partitions do not match for record# " +
recordId + " ! - '" + partition + "' v/s '" +
@@ -195,13 +204,16 @@
}
}
+ String keyBytes = key.toString();
+ String valueBytes = value.toString();
int keyValueChecksum =
- (WritableComparator.hashBytes(bwKey.get(), bwKey.getSize()) ^
- WritableComparator.hashBytes(bwValue.get(), bwValue.getSize()));
+ (WritableComparator.hashBytes(keyBytes.getBytes(), keyBytes.length()) ^
+ WritableComparator.hashBytes(valueBytes.getBytes(), valueBytes.length()));
// output (this.key, record-stats)
- output.collect(this.key, new RecordStatsWritable(
- (bwKey.getSize()+bwValue.getSize()), 1, keyValueChecksum));
+ output.collect(this.key,
+ new RecordStatsWritable((keyBytes.length()+valueBytes.length()),
+ 1, keyValueChecksum));
}
}