You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/20 14:27:08 UTC
svn commit: r766670 - in /hadoop/core/trunk: ./
src/examples/org/apache/hadoop/examples/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: sharad
Date: Mon Apr 20 12:27:08 2009
New Revision: 766670
URL: http://svn.apache.org/viewvc?rev=766670&view=rev
Log:
HADOOP-5681. Change examples RandomWriter and RandomTextWriter to use new mapreduce API. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Apr 20 12:27:08 2009
@@ -240,6 +240,9 @@
HADOOP-4490. Provide ability to run tasks as job owners.
(Sreekanth Ramakrishnan via yhemanth)
+ HADOOP-5681. Change examples RandomWriter and RandomTextWriter to
+ use new mapreduce API. (Amareshwari Sriramadasu via sharad)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomTextWriter.java Mon Apr 20 12:27:08 2009
@@ -29,15 +29,10 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -86,7 +81,7 @@
"[-outFormat <output format class>] " +
"<output>");
ToolRunner.printGenericCommandUsage(System.out);
- return -1;
+ return 2;
}
/**
@@ -94,8 +89,7 @@
*/
static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
- static class Map extends MapReduceBase
- implements Mapper<Text, Text, Text, Text> {
+ static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
private long numBytesToWrite;
private int minWordsInKey;
@@ -107,18 +101,19 @@
/**
* Save the configuration value that we need to write the data.
*/
- public void configure(JobConf job) {
- numBytesToWrite = job.getLong("test.randomtextwrite.bytes_per_map",
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ numBytesToWrite = conf.getLong("test.randomtextwrite.bytes_per_map",
1*1024*1024*1024);
minWordsInKey =
- job.getInt("test.randomtextwrite.min_words_key", 5);
+ conf.getInt("test.randomtextwrite.min_words_key", 5);
wordsInKeyRange =
- (job.getInt("test.randomtextwrite.max_words_key", 10) -
+ (conf.getInt("test.randomtextwrite.max_words_key", 10) -
minWordsInKey);
minWordsInValue =
- job.getInt("test.randomtextwrite.min_words_value", 10);
+ conf.getInt("test.randomtextwrite.min_words_value", 10);
wordsInValueRange =
- (job.getInt("test.randomtextwrite.max_words_value", 100) -
+ (conf.getInt("test.randomtextwrite.max_words_value", 100) -
minWordsInValue);
}
@@ -126,8 +121,7 @@
* Given an output filename, write a bunch of random records to it.
*/
public void map(Text key, Text value,
- OutputCollector<Text, Text> output,
- Reporter reporter) throws IOException {
+ Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
// Generate the key/value
@@ -139,20 +133,20 @@
Text valueWords = generateSentence(noWordsValue);
// Write the sentence
- output.collect(keyWords, valueWords);
+ context.write(keyWords, valueWords);
numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
// Update counters, progress etc.
- reporter.incrCounter(Counters.BYTES_WRITTEN,
- (keyWords.getLength()+valueWords.getLength()));
- reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
+ context.getCounter(Counters.BYTES_WRITTEN).increment(
+ keyWords.getLength() + valueWords.getLength());
+ context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
- reporter.setStatus("wrote record " + itemCount + ". " +
+ context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
- reporter.setStatus("done with " + itemCount + " records.");
+ context.setStatus("done with " + itemCount + " records.");
}
private Text generateSentence(int noWords) {
@@ -178,33 +172,35 @@
return printUsage();
}
- JobConf job = new JobConf(getConf());
-
- job.setJarByClass(RandomTextWriter.class);
- job.setJobName("random-text-writer");
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- job.setInputFormat(RandomWriter.RandomInputFormat.class);
- job.setMapperClass(Map.class);
-
- JobClient client = new JobClient(job);
+ Configuration conf = getConf();
+ JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
- int numMapsPerHost = job.getInt("test.randomtextwrite.maps_per_host", 10);
- long numBytesToWritePerMap = job.getLong("test.randomtextwrite.bytes_per_map",
+ int numMapsPerHost = conf.getInt("test.randomtextwrite.maps_per_host", 10);
+ long numBytesToWritePerMap = conf.getLong("test.randomtextwrite.bytes_per_map",
1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have test.randomtextwrite.bytes_per_map set to 0");
return -2;
}
- long totalBytesToWrite = job.getLong("test.randomtextwrite.total_bytes",
+ long totalBytesToWrite = conf.getLong("test.randomtextwrite.total_bytes",
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
- job.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
+ conf.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
}
+ conf.setInt("mapred.map.tasks", numMaps);
+
+ Job job = new Job(conf);
+
+ job.setJarByClass(RandomTextWriter.class);
+ job.setJobName("random-text-writer");
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(RandomWriter.RandomInputFormat.class);
+ job.setMapperClass(RandomTextMapper.class);
Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
@@ -224,10 +220,9 @@
}
}
- job.setOutputFormat(outputFormatClass);
+ job.setOutputFormatClass(outputFormatClass);
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));
- job.setNumMapTasks(numMaps);
System.out.println("Running " + numMaps + " maps.");
// reducer NONE
@@ -235,14 +230,14 @@
Date startTime = new Date();
System.out.println("Job started: " + startTime);
- JobClient.runJob(job);
+ int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
- return 0;
+ return ret;
}
public static void main(String[] args) throws Exception {
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Mon Apr 20 12:27:08 2009
@@ -19,7 +19,9 @@
package org.apache.hadoop.examples;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -30,19 +32,11 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -93,19 +87,20 @@
* A custom input format that creates virtual inputs of a single string
* for each map.
*/
- static class RandomInputFormat implements InputFormat<Text, Text> {
+ static class RandomInputFormat extends InputFormat<Text, Text> {
/**
* Generate the requested number of file splits, with the filename
* set to the filename of the output file.
*/
- public InputSplit[] getSplits(JobConf job,
- int numSplits) throws IOException {
- InputSplit[] result = new InputSplit[numSplits];
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ List<InputSplit> result = new ArrayList<InputSplit>();
Path outDir = FileOutputFormat.getOutputPath(job);
- for(int i=0; i < result.length; ++i) {
- result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
- (String[])null);
+ int numSplits =
+ job.getConfiguration().getInt("mapred.map.tasks", 1);
+ for(int i=0; i < numSplits; ++i) {
+ result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
+ (String[])null));
}
return result;
}
@@ -114,43 +109,52 @@
* Return a single record (filename, "") where the filename is taken from
* the file split.
*/
- static class RandomRecordReader implements RecordReader<Text, Text> {
+ static class RandomRecordReader extends RecordReader<Text, Text> {
Path name;
+ Text key = null;
+ Text value = new Text();
public RandomRecordReader(Path p) {
name = p;
}
- public boolean next(Text key, Text value) {
+
+ public void initialize(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ }
+
+ public boolean nextKeyValue() {
if (name != null) {
+ key = new Text();
key.set(name.getName());
name = null;
return true;
}
return false;
}
- public Text createKey() {
- return new Text();
- }
- public Text createValue() {
- return new Text();
+
+ public Text getCurrentKey() {
+ return key;
}
- public long getPos() {
- return 0;
+
+ public Text getCurrentValue() {
+ return value;
}
+
public void close() {}
+
public float getProgress() {
return 0.0f;
}
}
- public RecordReader<Text, Text> getRecordReader(InputSplit split,
- JobConf job,
- Reporter reporter) throws IOException {
+ public RecordReader<Text, Text> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
return new RandomRecordReader(((FileSplit) split).getPath());
}
}
- static class Map extends MapReduceBase
- implements Mapper<WritableComparable, Writable,
+ static class RandomMapper extends Mapper<WritableComparable, Writable,
BytesWritable, BytesWritable> {
private long numBytesToWrite;
@@ -173,8 +177,7 @@
*/
public void map(WritableComparable key,
Writable value,
- OutputCollector<BytesWritable, BytesWritable> output,
- Reporter reporter) throws IOException {
+ Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
@@ -185,16 +188,16 @@
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
- output.collect(randomKey, randomValue);
+ context.write(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
- reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
- reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
+ context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
+ context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
- reporter.setStatus("wrote record " + itemCount + ". " +
+ context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
- reporter.setStatus("done with " + itemCount + " records.");
+ context.setStatus("done with " + itemCount + " records.");
}
/**
@@ -202,17 +205,17 @@
* the data.
*/
@Override
- public void configure(JobConf job) {
- numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ numBytesToWrite = conf.getLong("test.randomwrite.bytes_per_map",
1*1024*1024*1024);
- minKeySize = job.getInt("test.randomwrite.min_key", 10);
+ minKeySize = conf.getInt("test.randomwrite.min_key", 10);
keySizeRange =
- job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
- minValueSize = job.getInt("test.randomwrite.min_value", 0);
+ conf.getInt("test.randomwrite.max_key", 1000) - minKeySize;
+ minValueSize = conf.getInt("test.randomwrite.min_value", 0);
valueSizeRange =
- job.getInt("test.randomwrite.max_value", 20000) - minValueSize;
+ conf.getInt("test.randomwrite.max_value", 20000) - minValueSize;
}
-
}
/**
@@ -226,42 +229,41 @@
if (args.length == 0) {
System.out.println("Usage: writer <out-dir>");
ToolRunner.printGenericCommandUsage(System.out);
- return -1;
+ return 2;
}
Path outDir = new Path(args[0]);
- JobConf job = new JobConf(getConf());
-
- job.setJarByClass(RandomWriter.class);
- job.setJobName("random-writer");
- FileOutputFormat.setOutputPath(job, outDir);
-
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(BytesWritable.class);
-
- job.setInputFormat(RandomInputFormat.class);
- job.setMapperClass(Map.class);
- job.setReducerClass(IdentityReducer.class);
- job.setOutputFormat(SequenceFileOutputFormat.class);
-
- JobClient client = new JobClient(job);
+ Configuration conf = getConf();
+ JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
- int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
- long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
+ int numMapsPerHost = conf.getInt("test.randomwriter.maps_per_host", 10);
+ long numBytesToWritePerMap = conf.getLong("test.randomwrite.bytes_per_map",
1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
return -2;
}
- long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",
+ long totalBytesToWrite = conf.getLong("test.randomwrite.total_bytes",
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
- job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
+ conf.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
}
+ conf.setInt("mapred.map.tasks", numMaps);
+
+ Job job = new Job(conf);
+
+ job.setJarByClass(RandomWriter.class);
+ job.setJobName("random-writer");
+ FileOutputFormat.setOutputPath(job, outDir);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(BytesWritable.class);
+ job.setInputFormatClass(RandomInputFormat.class);
+ job.setMapperClass(RandomMapper.class);
+ job.setReducerClass(Reducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setNumMapTasks(numMaps);
System.out.println("Running " + numMaps + " maps.");
// reducer NONE
@@ -269,14 +271,14 @@
Date startTime = new Date();
System.out.println("Job started: " + startTime);
- JobClient.runJob(job);
+ int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
- return 0;
+ return ret;
}
public static void main(String[] args) throws Exception {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Mon Apr 20 12:27:08 2009
@@ -411,6 +411,18 @@
}
/**
+ * Build a job client with the given {@link Configuration},
+ * and connect to the default {@link JobTracker}.
+ *
+ * @param conf the configuration.
+ * @throws IOException
+ */
+ public JobClient(Configuration conf) throws IOException {
+ setConf(conf);
+ init(new JobConf(conf));
+ }
+
+ /**
* Connect to the default {@link JobTracker}.
* @param conf the job configuration.
* @throws IOException
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=766670&r1=766669&r2=766670&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java Mon Apr 20 12:27:08 2009
@@ -491,7 +491,7 @@
// Input formats
/**
* A custom input format that creates virtual inputs of a single string
- * for each map. Using {@link RandomWriter} code.
+ * for each map.
*/
public static class RandomInputFormat implements InputFormat<Text, Text> {