You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/08/26 12:30:33 UTC
svn commit: r807954 - in /hadoop/mapreduce/trunk: ./ src/benchmarks/gridmix2/
src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/
src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/
src/test/mapred/org/apache/hadoop/mapreduce/
Author: cdouglas
Date: Wed Aug 26 10:30:32 2009
New Revision: 807954
URL: http://svn.apache.org/viewvc?rev=807954&view=rev
Log:
MAPREDUCE-788. Update gridmix2 to use the new API Contributed by Amareshwari Sriramadasu
Added:
hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/
hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java
- copied, changed from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java
hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java
- copied, changed from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java
hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java
- copied, changed from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
- copied, changed from r807951, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
Removed:
hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml
hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=807954&r1=807953&r2=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Aug 26 10:30:32 2009
@@ -271,6 +271,9 @@
MAPREDUCE-910. Support counters in MRUnit. (Aaron Kimball via cdouglas)
+ MAPREDUCE-788. Update gridmix2 to use the new API (Amareshwari Sriramadasu
+ via cdouglas)
+
BUG FIXES
MAPREDUCE-878. Rename fair scheduler design doc to
Modified: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml?rev=807954&r1=807953&r2=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/gridmix_config.xml Wed Aug 26 10:30:32 2009
@@ -39,7 +39,7 @@
<property>
<name>streamSort.smallJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
@@ -74,7 +74,7 @@
</property>
<property>
<name>streamSort.mediumJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -131,7 +131,7 @@
</property>
<property>
<name>javaSort.smallJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -160,7 +160,7 @@
</property>
<property>
<name>javaSort.mediumJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -217,7 +217,7 @@
</property>
<property>
<name>combiner.smallJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -246,7 +246,7 @@
</property>
<property>
<name>combiner.mediumJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -303,7 +303,7 @@
</property>
<property>
<name>monsterQuery.smallJobs.inputFiles</name>
- <value>${FIXCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+ <value>${FIXCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -332,7 +332,7 @@
</property>
<property>
<name>monsterQuery.mediumJobs.inputFiles</name>
- <value>${FIXCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${FIXCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -390,7 +390,7 @@
<property>
<name>webdataScan.smallJobs.inputFiles</name>
- <value>${VARCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -413,7 +413,7 @@
<property>
<name>webdataScan.mediumJobs.inputFiles</name>
- <value>${VARCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -469,7 +469,7 @@
</property>
<property>
<name>webdataSort.smallJobs.inputFiles</name>
- <value>${VARCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -498,7 +498,7 @@
</property>
<property>
<name>webdataSort.mediumJobs.inputFiles</name>
- <value>${VARCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
Modified: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2?rev=807954&r1=807953&r2=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2 (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/rungridmix_2 Wed Aug 26 10:30:32 2009
@@ -30,7 +30,7 @@
export HADOOP_CLASSPATH=${APP_JAR}:${EXAMPLE_JAR}:${STREAMING_JAR}
export LIBJARS=${APP_JAR},${EXAMPLE_JAR},${STREAMING_JAR}
-${HADOOP_HOME}/bin/hadoop jar gridmix.jar org.apache.hadoop.mapred.GridMixRunner -libjars ${LIBJARS}
+${HADOOP_HOME}/bin/hadoop jar gridmix.jar org.apache.hadoop.mapreduce.GridMixRunner -libjars ${LIBJARS}
Date=`date +%F-%H-%M-%S-%N`
echo $Date > $1_end.out
Copied: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java (from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java?p2=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java&p1=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java&r1=807951&r2=807954&rev=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/CombinerJobCreator.java Wed Aug 26 10:30:32 2009
@@ -16,75 +16,34 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.StringTokenizer;
+package org.apache.hadoop.mapreduce;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
public class CombinerJobCreator {
- public static class MapClass extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, IntWritable> {
-
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
-
- public void map(LongWritable key, Text value,
- OutputCollector<Text, IntWritable> output,
- Reporter reporter) throws IOException {
- String line = value.toString();
- StringTokenizer itr = new StringTokenizer(line);
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- output.collect(word, one);
- }
- }
- }
-
- public static class Reduce extends MapReduceBase
- implements Reducer<Text, IntWritable, Text, IntWritable> {
-
- public void reduce(Text key, Iterator<IntWritable> values,
- OutputCollector<Text, IntWritable> output,
- Reporter reporter) throws IOException {
- int sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
- }
- output.collect(key, new IntWritable(sum));
- }
- }
-
- public static JobConf createJob(String[] args) throws Exception {
- JobConf conf = new JobConf(CombinerJobCreator.class);
- conf.setJobName("GridmixCombinerJob");
-
- // the keys are words (strings)
- conf.setOutputKeyClass(Text.class);
- // the values are counts (ints)
- conf.setOutputValueClass(IntWritable.class);
-
- conf.setMapperClass(MapClass.class);
- conf.setCombinerClass(Reduce.class);
- conf.setReducerClass(Reduce.class);
+ public static Job createJob(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ int numReduces = 1;
+ String indir = null;
+ String outdir = null;
boolean mapoutputCompressed = false;
boolean outputCompressed = false;
- // List<String> other_args = new ArrayList<String>();
for (int i = 0; i < args.length; ++i) {
try {
if ("-r".equals(args[i])) {
- conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+ numReduces = Integer.parseInt(args[++i]);
} else if ("-indir".equals(args[i])) {
- FileInputFormat.setInputPaths(conf, args[++i]);
+ indir = args[++i];
} else if ("-outdir".equals(args[i])) {
- FileOutputFormat.setOutputPath(conf, new Path(args[++i]));
-
+ outdir = args[++i];
} else if ("-mapoutputCompressed".equals(args[i])) {
mapoutputCompressed = Boolean.valueOf(args[++i]).booleanValue();
} else if ("-outputCompressed".equals(args[i])) {
@@ -99,8 +58,28 @@
return null;
}
}
- conf.setCompressMapOutput(mapoutputCompressed);
+ conf.setBoolean("mapred.compress.map.output", mapoutputCompressed);
conf.setBoolean("mapred.output.compress", outputCompressed);
- return conf;
+
+ Job job = new Job(conf);
+ job.setJobName("GridmixCombinerJob");
+
+ // the keys are words (strings)
+ job.setOutputKeyClass(Text.class);
+ // the values are counts (ints)
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(TokenCounterMapper.class);
+ job.setCombinerClass(IntSumReducer.class);
+ job.setReducerClass(IntSumReducer.class);
+
+ job.setNumReduceTasks(numReduces);
+ if (indir != null) {
+ FileInputFormat.setInputPaths(job, indir);
+ }
+ if (outdir != null) {
+ FileOutputFormat.setOutputPath(job, new Path(outdir));
+ }
+ return job;
}
}
Copied: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java (from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java?p2=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java&p1=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java&r1=807951&r2=807954&rev=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GenericMRLoadJobCreator.java Wed Aug 26 10:30:32 2009
@@ -16,27 +16,29 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapreduce;
import java.util.Random;
import java.util.Stack;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.GenericMRLoadGenerator;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
public class GenericMRLoadJobCreator extends GenericMRLoadGenerator {
- public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
+ public static Job createJob(String[] argv, boolean mapoutputCompressed,
boolean outputCompressed) throws Exception {
- JobConf job = new JobConf();
+ Job job = new Job();
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
@@ -46,28 +48,29 @@
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
- job.setOutputFormat(NullOutputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
}
+ Configuration conf = job.getConfiguration();
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
- } else if (null != job.getClass("mapred.indirect.input.format", null)) {
+ } else if (null != conf.getClass("mapred.indirect.input.format", null)) {
// specified IndirectInputFormat? Build src list
- JobClient jClient = new JobClient(job);
+ JobClient jClient = new JobClient(conf);
Path sysdir = jClient.getSystemDir();
Random r = new Random();
Path indirInputFile = new Path(sysdir, Integer.toString(r
.nextInt(Integer.MAX_VALUE), 36)
+ "_files");
- job.set("mapred.indirect.input.file", indirInputFile.toString());
+ conf.set("mapred.indirect.input.file", indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(sysdir
- .getFileSystem(job), job, indirInputFile, LongWritable.class,
+ .getFileSystem(conf), conf, indirInputFile, LongWritable.class,
Text.class, SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
- FileSystem fs = p.getFileSystem(job);
+ FileSystem fs = p.getFileSystem(conf);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
@@ -89,10 +92,9 @@
}
}
- job.setCompressMapOutput(mapoutputCompressed);
- job.setBoolean("mapred.output.compress", outputCompressed);
+ conf.setBoolean("mapred.compress.map.output", mapoutputCompressed);
+ conf.setBoolean("mapred.output.compress", outputCompressed);
return job;
-
}
}
Copied: hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java (from r807951, hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java?p2=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java&p1=hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java&r1=807951&r2=807954&rev=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java (original)
+++ hadoop/mapreduce/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapreduce/GridMixRunner.java Wed Aug 26 10:30:32 2009
@@ -16,15 +16,15 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapreduce;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -32,11 +32,15 @@
import org.apache.hadoop.examples.Sort;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.jobcontrol.*;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
import org.apache.hadoop.streaming.StreamJob;
public class GridMixRunner {
@@ -66,17 +70,17 @@
private enum Size {
SMALL("small", // name
- "/{part-00000,part-00001,part-00002}", // default input subset
- NUM_OF_SMALL_JOBS_PER_CLASS, // defuault num jobs
- NUM_OF_REDUCERS_FOR_SMALL_JOB), // default num reducers
+ "/{part-*-00000,part-*-00001,part-*-00002}", // default input subset
+ NUM_OF_SMALL_JOBS_PER_CLASS, // defuault num jobs
+ NUM_OF_REDUCERS_FOR_SMALL_JOB), // default num reducers
MEDIUM("medium", // name
- "/{part-000*0,part-000*1,part-000*2}", // default input subset
- NUM_OF_MEDIUM_JOBS_PER_CLASS, // defuault num jobs
- NUM_OF_REDUCERS_FOR_MEDIUM_JOB), // default num reducers
+ "/{part-*-000*0, part-*-000*1, part-*-000*2}", // default input subset
+ NUM_OF_MEDIUM_JOBS_PER_CLASS, // defuault num jobs
+ NUM_OF_REDUCERS_FOR_MEDIUM_JOB), // default num reducers
LARGE("large", // name
- "", // default input subset
- NUM_OF_LARGE_JOBS_PER_CLASS, // defuault num jobs
- NUM_OF_REDUCERS_FOR_LARGE_JOB); // default num reducers
+ "", // default input subset
+ NUM_OF_LARGE_JOBS_PER_CLASS, // defuault num jobs
+ NUM_OF_REDUCERS_FOR_LARGE_JOB); // default num reducers
private final String str;
private final String path;
@@ -107,7 +111,8 @@
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("streamSort.%sJobs.inputFiles", size);
- final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
+ final String indir =
+ getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
final String outdir = addTSSuffix("perf-out/stream-out-dir-" + size);
StringBuffer sb = new StringBuffer();
@@ -120,12 +125,12 @@
clearDir(outdir);
try {
- JobConf jobconf = StreamJob.createJob(args);
- jobconf.setJobName("GridmixStreamingSorter." + size);
- jobconf.setCompressMapOutput(mapoutputCompressed);
- jobconf.setBoolean("mapred.output.compress", outputCompressed);
- Job job = new Job(jobconf);
- gridmix.addJob(job);
+ Configuration conf = StreamJob.createJob(args);
+ conf.setBoolean("mapred.output.compress", outputCompressed);
+ conf.setBoolean("mapred.compress.map.output", mapoutputCompressed);
+ Job job = new Job(conf, "GridmixStreamingSorter." + size);
+ ControlledJob cjob = new ControlledJob(job, null);
+ gridmix.addJob(cjob);
} catch (Exception ex) {
ex.printStackTrace();
}
@@ -136,33 +141,34 @@
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("javaSort.%sJobs.inputFiles", size);
- final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
+ final String indir = getInputDirsFor(prop,
+ size.defaultPath(VARINFLTEXT));
final String outdir = addTSSuffix("perf-out/sort-out-dir-" + size);
clearDir(outdir);
try {
- JobConf jobConf = new JobConf();
- jobConf.setJarByClass(Sort.class);
- jobConf.setJobName("GridmixJavaSorter." + size);
- jobConf.setMapperClass(IdentityMapper.class);
- jobConf.setReducerClass(IdentityReducer.class);
-
- jobConf.setNumReduceTasks(numReducers);
- jobConf.setInputFormat(org.apache.hadoop.mapred.KeyValueTextInputFormat.class);
- jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
-
- jobConf.setOutputKeyClass(org.apache.hadoop.io.Text.class);
- jobConf.setOutputValueClass(org.apache.hadoop.io.Text.class);
- jobConf.setCompressMapOutput(mapoutputCompressed);
- jobConf.setBoolean("mapred.output.compress", outputCompressed);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.output.compress", outputCompressed);
+ conf.setBoolean("mapred.compress.map.output", mapoutputCompressed);
+ Job job = new Job(conf);
+ job.setJarByClass(Sort.class);
+ job.setJobName("GridmixJavaSorter." + size);
+ job.setMapperClass(Mapper.class);
+ job.setReducerClass(Reducer.class);
+
+ job.setNumReduceTasks(numReducers);
+ job.setInputFormatClass(KeyValueTextInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
- FileInputFormat.addInputPaths(jobConf, indir);
- FileOutputFormat.setOutputPath(jobConf, new Path(outdir));
+ job.setOutputKeyClass(org.apache.hadoop.io.Text.class);
+ job.setOutputValueClass(org.apache.hadoop.io.Text.class);
- Job job = new Job(jobConf);
- gridmix.addJob(job);
+ FileInputFormat.addInputPaths(job, indir);
+ FileOutputFormat.setOutputPath(job, new Path(outdir));
+ ControlledJob cjob = new ControlledJob(job, null);
+ gridmix.addJob(cjob);
} catch (Exception ex) {
ex.printStackTrace();
}
@@ -174,12 +180,17 @@
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("webdataScan.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
- final String outdir = addTSSuffix("perf-out/webdata-scan-out-dir-" + size);
+ final String outdir = addTSSuffix("perf-out/webdata-scan-out-dir-"
+ + size);
StringBuffer sb = new StringBuffer();
sb.append("-keepmap 0.2 ");
sb.append("-keepred 5 ");
- sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
- sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+ sb.append("-inFormat");
+ sb.append(" org.apache.hadoop.mapreduce." +
+ "lib.input.SequenceFileInputFormat ");
+ sb.append("-outFormat");
+ sb.append(" org.apache.hadoop.mapreduce." +
+ "lib.output.SequenceFileOutputFormat ");
sb.append("-outKey org.apache.hadoop.io.Text ");
sb.append("-outValue org.apache.hadoop.io.Text ");
sb.append("-indir ").append(indir).append(" ");
@@ -189,11 +200,11 @@
String[] args = sb.toString().split(" ");
clearDir(outdir);
try {
- JobConf jobconf = GenericMRLoadJobCreator.createJob(
+ Job job = GenericMRLoadJobCreator.createJob(
args, mapoutputCompressed, outputCompressed);
- jobconf.setJobName("GridmixWebdatascan." + size);
- Job job = new Job(jobconf);
- gridmix.addJob(job);
+ job.setJobName("GridmixWebdatascan." + size);
+ ControlledJob cjob = new ControlledJob(job, null);
+ gridmix.addJob(cjob);
} catch (Exception ex) {
System.out.println(ex.getStackTrace());
}
@@ -211,16 +222,17 @@
sb.append("-r ").append(numReducers).append(" ");
sb.append("-indir ").append(indir).append(" ");
sb.append("-outdir ").append(outdir);
- sb.append("-mapoutputCompressed ").append(mapoutputCompressed).append(" ");
+ sb.append("-mapoutputCompressed ");
+ sb.append(mapoutputCompressed).append(" ");
sb.append("-outputCompressed ").append(outputCompressed);
String[] args = sb.toString().split(" ");
clearDir(outdir);
try {
- JobConf jobconf = CombinerJobCreator.createJob(args);
- jobconf.setJobName("GridmixCombinerJob." + size);
- Job job = new Job(jobconf);
- gridmix.addJob(job);
+ Job job = CombinerJobCreator.createJob(args);
+ job.setJobName("GridmixCombinerJob." + size);
+ ControlledJob cjob = new ControlledJob(job, null);
+ gridmix.addJob(cjob);
} catch (Exception ex) {
ex.printStackTrace();
}
@@ -230,13 +242,14 @@
MONSTERQUERY("monsterQuery") {
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
- final String prop = String.format("monsterQuery.%sJobs.inputFiles", size);
+ final String prop =
+ String.format("monsterQuery.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop, size.defaultPath(FIXCOMPSEQ));
final String outdir = addTSSuffix("perf-out/mq-out-dir-" + size);
int iter = 3;
try {
- Job pjob = null;
- Job job = null;
+ ControlledJob pjob = null;
+ ControlledJob cjob = null;
for (int i = 0; i < iter; i++) {
String outdirfull = outdir + "." + i;
String indirfull = (0 == i) ? indir : outdir + "." + (i - 1);
@@ -245,8 +258,12 @@
StringBuffer sb = new StringBuffer();
sb.append("-keepmap 10 ");
sb.append("-keepred 40 ");
- sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
- sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+ sb.append("-inFormat");
+ sb.append(" org.apache.hadoop.mapreduce." +
+ "lib.input.SequenceFileInputFormat ");
+ sb.append("-outFormat");
+ sb.append(" org.apache.hadoop.mapreduce." +
+ "lib.output.SequenceFileOutputFormat ");
sb.append("-outKey org.apache.hadoop.io.Text ");
sb.append("-outValue org.apache.hadoop.io.Text ");
sb.append("-indir ").append(indirfull).append(" ");
@@ -260,15 +277,15 @@
System.out.println(ex.toString());
}
- JobConf jobconf = GenericMRLoadJobCreator.createJob(
+ Job job = GenericMRLoadJobCreator.createJob(
args, mapoutputCompressed, outputCompressed);
- jobconf.setJobName("GridmixMonsterQuery." + size);
- job = new Job(jobconf);
+ job.setJobName("GridmixMonsterQuery." + size);
+ cjob = new ControlledJob(job, null);
if (pjob != null) {
- job.addDependingJob(pjob);
+ cjob.addDependingJob(pjob);
}
- gridmix.addJob(job);
- pjob = job;
+ gridmix.addJob(cjob);
+ pjob = cjob;
}
} catch (Exception e) {
System.out.println(e.getStackTrace());
@@ -281,13 +298,16 @@
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("webdataSort.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
- final String outdir = addTSSuffix("perf-out/webdata-sort-out-dir-" + size);
+ final String outdir =
+ addTSSuffix("perf-out/webdata-sort-out-dir-" + size);
StringBuffer sb = new StringBuffer();
sb.append("-keepmap 100 ");
sb.append("-keepred 100 ");
- sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
- sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+ sb.append("-inFormat org.apache.hadoop.mapreduce." +
+ "lib.input.SequenceFileInputFormat ");
+ sb.append("-outFormat org.apache.hadoop.mapreduce." +
+ "lib.output.SequenceFileOutputFormat ");
sb.append("-outKey org.apache.hadoop.io.Text ");
sb.append("-outValue org.apache.hadoop.io.Text ");
sb.append("-indir ").append(indir).append(" ");
@@ -297,11 +317,11 @@
String[] args = sb.toString().split(" ");
clearDir(outdir);
try {
- JobConf jobconf = GenericMRLoadJobCreator.createJob(
+ Job job = GenericMRLoadJobCreator.createJob(
args, mapoutputCompressed, outputCompressed);
- jobconf.setJobName("GridmixWebdataSort." + size);
- Job job = new Job(jobconf);
- gridmix.addJob(job);
+ job.setJobName("GridmixWebdataSort." + size);
+ ControlledJob cjob = new ControlledJob(job, null);
+ gridmix.addJob(cjob);
} catch (Exception ex) {
System.out.println(ex.getStackTrace());
}
@@ -443,7 +463,8 @@
for (GridMixJob jobtype : EnumSet.allOf(GridMixJob.class)) {
addAllJobs(jobtype);
}
- System.out.println("total " + gridmix.getWaitingJobs().size() + " jobs");
+ System.out.println("total " +
+ gridmix.getWaitingJobList().size() + " jobs");
}
class SimpleStats {
@@ -480,35 +501,35 @@
}
}
- private TreeMap<String, String> getStatForJob(Job job) {
+ private TreeMap<String, String> getStatForJob(ControlledJob cjob) {
TreeMap<String, String> retv = new TreeMap<String, String>();
- String mapreduceID = job.getAssignedJobID().toString();
- JobClient jc = job.getJobClient();
- JobConf jobconf = job.getJobConf();
- String jobName = jobconf.getJobName();
- retv.put("JobId", mapreduceID);
+ JobID mapreduceID = cjob.getMapredJobID();
+ Job job = cjob.getJob();
+ String jobName = job.getJobName();
+ retv.put("JobId", mapreduceID.toString());
retv.put("JobName", jobName);
TaskExecutionStats theTaskExecutionStats = new TaskExecutionStats();
try {
- RunningJob running = jc.getJob(JobID.forName(mapreduceID));
- Counters jobCounters = running.getCounters();
- Iterator<Group> groups = jobCounters.iterator();
+ Counters jobCounters = job.getCounters();
+ Iterator<CounterGroup> groups = jobCounters.iterator();
while (groups.hasNext()) {
- Group g = groups.next();
+ CounterGroup g = groups.next();
String gn = g.getName();
- Iterator<Counters.Counter> cs = g.iterator();
+ Iterator<Counter> cs = g.iterator();
while (cs.hasNext()) {
- Counters.Counter c = cs.next();
+ Counter c = cs.next();
String n = c.getName();
- long v = c.getCounter();
+ long v = c.getValue();
retv.put(mapreduceID + "." + jobName + "." + gn + "." + n, "" + v);
}
}
- TaskReport[] maps = jc.getMapTaskReports(JobID.forName(mapreduceID));
+ JobClient jc = new JobClient(job.getConfiguration());
+ TaskReport[] maps = jc
+ .getMapTaskReports((org.apache.hadoop.mapred.JobID)mapreduceID);
TaskReport[] reduces = jc
- .getReduceTaskReports(JobID.forName(mapreduceID));
+ .getReduceTaskReports((org.apache.hadoop.mapred.JobID)mapreduceID);
retv.put(mapreduceID + "." + jobName + "." + "numOfMapTasks", ""
+ maps.length);
retv.put(mapreduceID + "." + jobName + "." + "numOfReduceTasks", ""
@@ -562,15 +583,15 @@
+ startTime);
retv.put(mapreduceID + "." + jobName + "." + "reduceEndTime", ""
+ finishTime);
- if (job.getState() == Job.SUCCESS) {
+ if (cjob.getJobState() == ControlledJob.State.SUCCESS) {
retv.put(mapreduceID + "." + "jobStatus", "successful");
- } else if (job.getState() == Job.FAILED) {
+ } else if (cjob.getJobState() == ControlledJob.State.FAILED) {
retv.put(mapreduceID + "." + jobName + "." + "jobStatus", "failed");
} else {
retv.put(mapreduceID + "." + jobName + "." + "jobStatus", "unknown");
}
- Iterator<Entry<String, SimpleStats>> entries = theTaskExecutionStats.theStats
- .entrySet().iterator();
+ Iterator<Entry<String, SimpleStats>> entries =
+ theTaskExecutionStats.theStats.entrySet().iterator();
while (entries.hasNext()) {
Entry<String, SimpleStats> e = entries.next();
SimpleStats v = e.getValue();
@@ -599,7 +620,7 @@
}
}
- private void printStatsForJobs(ArrayList<Job> jobs) {
+ private void printStatsForJobs(List<ControlledJob> jobs) {
for (int i = 0; i < jobs.size(); i++) {
printJobStat(getStatForJob(jobs.get(i)));
}
@@ -612,15 +633,15 @@
long startTime = System.currentTimeMillis();
while (!gridmix.allFinished()) {
System.out.println("Jobs in waiting state: "
- + gridmix.getWaitingJobs().size());
+ + gridmix.getWaitingJobList().size());
System.out.println("Jobs in ready state: "
- + gridmix.getReadyJobs().size());
+ + gridmix.getReadyJobsList().size());
System.out.println("Jobs in running state: "
- + gridmix.getRunningJobs().size());
+ + gridmix.getRunningJobList().size());
System.out.println("Jobs in success state: "
- + gridmix.getSuccessfulJobs().size());
+ + gridmix.getSuccessfulJobList().size());
System.out.println("Jobs in failed state: "
- + gridmix.getFailedJobs().size());
+ + gridmix.getFailedJobList().size());
System.out.println("\n");
try {
@@ -630,8 +651,8 @@
}
}
long endTime = System.currentTimeMillis();
- ArrayList<Job> fail = gridmix.getFailedJobs();
- ArrayList<Job> succeed = gridmix.getSuccessfulJobs();
+ List<ControlledJob> fail = gridmix.getFailedJobList();
+ List<ControlledJob> succeed = gridmix.getSuccessfulJobList();
int numOfSuccessfulJob = succeed.size();
if (numOfSuccessfulJob > 0) {
System.out.println(numOfSuccessfulJob + " jobs succeeded");
@@ -646,7 +667,7 @@
}
System.out.println("GridMix results:");
System.out.println("Total num of Jobs: " + numOfJobs);
- System.out.println("ExecutionTime: " + ((endTime-startTime)/1000));
+ System.out.println("ExecutionTime: " + ((endTime-startTime) / 1000));
gridmix.stop();
}
Copied: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java (from r807951, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java?p2=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java&p1=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java&r1=807951&r2=807954&rev=807954&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java Wed Aug 26 10:30:32 2009
@@ -16,14 +16,14 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
-import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import java.util.Stack;
@@ -38,7 +38,13 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
@@ -61,7 +67,7 @@
/**
* Configure a job given argv.
*/
- public static boolean parseArgs(String[] argv, JobConf job) throws IOException {
+ public static boolean parseArgs(String[] argv, Job job) throws IOException {
if (argv.length < 1) {
return 0 == printUsage();
}
@@ -72,15 +78,13 @@
return 0 == printUsage();
}
try {
- if ("-m".equals(argv[i])) {
- job.setNumMapTasks(Integer.parseInt(argv[++i]));
- } else if ("-r".equals(argv[i])) {
+ if ("-r".equals(argv[i])) {
job.setNumReduceTasks(Integer.parseInt(argv[++i]));
} else if ("-inFormat".equals(argv[i])) {
- job.setInputFormat(
+ job.setInputFormatClass(
Class.forName(argv[++i]).asSubclass(InputFormat.class));
} else if ("-outFormat".equals(argv[i])) {
- job.setOutputFormat(
+ job.setOutputFormatClass(
Class.forName(argv[++i]).asSubclass(OutputFormat.class));
} else if ("-outKey".equals(argv[i])) {
job.setOutputKeyClass(
@@ -89,18 +93,20 @@
job.setOutputValueClass(
Class.forName(argv[++i]).asSubclass(Writable.class));
} else if ("-keepmap".equals(argv[i])) {
- job.set("hadoop.sort.map.keep.percent", argv[++i]);
+ job.getConfiguration().set("hadoop.sort.map.keep.percent",
+ argv[++i]);
} else if ("-keepred".equals(argv[i])) {
- job.set("hadoop.sort.reduce.keep.percent", argv[++i]);
+ job.getConfiguration().set("hadoop.sort.reduce.keep.percent",
+ argv[++i]);
} else if ("-outdir".equals(argv[i])) {
FileOutputFormat.setOutputPath(job, new Path(argv[++i]));
} else if ("-indir".equals(argv[i])) {
FileInputFormat.addInputPaths(job, argv[++i]);
} else if ("-inFormatIndirect".equals(argv[i])) {
- job.setClass("mapred.indirect.input.format",
+ job.getConfiguration().setClass("mapred.indirect.input.format",
Class.forName(argv[++i]).asSubclass(InputFormat.class),
InputFormat.class);
- job.setInputFormat(IndirectInputFormat.class);
+ job.setInputFormatClass(IndirectInputFormat.class);
} else {
System.out.println("Unexpected argument: " + argv[i]);
return 0 == printUsage();
@@ -116,7 +122,7 @@
}
public int run(String [] argv) throws Exception {
- JobConf job = new JobConf(getConf());
+ Job job = new Job(getConf());
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
@@ -124,30 +130,31 @@
return -1;
}
+ Configuration conf = job.getConfiguration();
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
- job.setOutputFormat(NullOutputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
}
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
- } else if (null != job.getClass("mapred.indirect.input.format", null)) {
+ } else if (null != conf.getClass("mapred.indirect.input.format", null)) {
// specified IndirectInputFormat? Build src list
- JobClient jClient = new JobClient(job);
+ JobClient jClient = new JobClient(conf);
Path sysdir = jClient.getSystemDir();
Random r = new Random();
Path indirInputFile = new Path(sysdir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
- job.set("mapred.indirect.input.file", indirInputFile.toString());
+ conf.set("mapred.indirect.input.file", indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
- sysdir.getFileSystem(job), job, indirInputFile,
+ sysdir.getFileSystem(conf), conf, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
- FileSystem fs = p.getFileSystem(job);
+ FileSystem fs = p.getFileSystem(conf);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
@@ -171,14 +178,14 @@
Date startTime = new Date();
System.out.println("Job started: " + startTime);
- JobClient.runJob(job);
+ int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
- return 0;
+ return ret;
}
/**
@@ -190,24 +197,27 @@
System.exit(res);
}
- static class RandomInputFormat implements InputFormat {
+ static class RandomInputFormat extends InputFormat<Text, Text> {
- public InputSplit[] getSplits(JobConf conf, int numSplits) {
- InputSplit[] splits = new InputSplit[numSplits];
+ public List<InputSplit> getSplits(JobContext job) {
+ int numSplits = job.getConfiguration().getInt("mapred.map.tasks", 1);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
for (int i = 0; i < numSplits; ++i) {
- splits[i] = new IndirectInputFormat.IndirectSplit(
- new Path("ignore" + i), 1);
+ splits.add(new IndirectInputFormat.IndirectSplit(
+ new Path("ignore" + i), 1));
}
return splits;
}
- public RecordReader<Text,Text> getRecordReader(InputSplit split,
- JobConf job, Reporter reporter) throws IOException {
+ public RecordReader<Text,Text> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
final IndirectInputFormat.IndirectSplit clSplit =
(IndirectInputFormat.IndirectSplit)split;
return new RecordReader<Text,Text>() {
boolean once = true;
- public boolean next(Text key, Text value) {
+ Text key = new Text();
+ Text value = new Text();
+ public boolean nextKeyValue() {
if (once) {
key.set(clSplit.getPath().toString());
once = false;
@@ -215,9 +225,10 @@
}
return false;
}
- public Text createKey() { return new Text(); }
- public Text createValue() { return new Text(); }
- public long getPos() { return 0; }
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {}
+ public Text getCurrentKey() { return key; }
+ public Text getCurrentValue() { return value; }
public void close() { }
public float getProgress() { return 0.0f; }
};
@@ -226,8 +237,7 @@
static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
- static class RandomMapOutput extends MapReduceBase
- implements Mapper<Text,Text,Text,Text> {
+ static class RandomMapOutput extends Mapper<Text,Text,Text,Text> {
StringBuilder sentence = new StringBuilder();
int keymin;
int keymax;
@@ -248,17 +258,18 @@
return sentence.length();
}
- public void configure(JobConf job) {
- bytesToWrite = job.getLong("test.randomtextwrite.bytes_per_map",
+ public void setup(Context context) {
+ Configuration conf = new Configuration();
+ bytesToWrite = conf.getLong("test.randomtextwrite.bytes_per_map",
1*1024*1024*1024);
- keymin = job.getInt("test.randomtextwrite.min_words_key", 5);
- keymax = job.getInt("test.randomtextwrite.max_words_key", 10);
- valmin = job.getInt("test.randomtextwrite.min_words_value", 5);
- valmax = job.getInt("test.randomtextwrite.max_words_value", 10);
+ keymin = conf.getInt("test.randomtextwrite.min_words_key", 5);
+ keymax = conf.getInt("test.randomtextwrite.max_words_key", 10);
+ valmin = conf.getInt("test.randomtextwrite.min_words_value", 5);
+ valmax = conf.getInt("test.randomtextwrite.max_words_value", 10);
}
- public void map(Text key, Text val, OutputCollector<Text,Text> output,
- Reporter reporter) throws IOException {
+ public void map(Text key, Text val, Context context)
+ throws IOException, InterruptedException {
long acc = 0L;
long recs = 0;
final int keydiff = keymax - keymin;
@@ -269,14 +280,14 @@
(0 == keydiff ? 0 : r.nextInt(keydiff)));
recacc += generateSentence(val, valmin +
(0 == valdiff ? 0 : r.nextInt(valdiff)));
- output.collect(key, val);
+ context.write(key, val);
++recs;
acc += recacc;
- reporter.incrCounter(Counters.BYTES_WRITTEN, recacc);
- reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
- reporter.setStatus(acc + "/" + (bytesToWrite - acc) + " bytes");
+ context.getCounter(Counters.BYTES_WRITTEN).increment(recacc);
+ context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
+ context.setStatus(acc + "/" + (bytesToWrite - acc) + " bytes");
}
- reporter.setStatus("Wrote " + recs + " records");
+ context.setStatus("Wrote " + recs + " records");
}
}
@@ -284,84 +295,95 @@
/**
* When no input dir is specified, generate random data.
*/
- protected static void confRandom(JobConf job)
+ protected static void confRandom(Job job)
throws IOException {
// from RandomWriter
- job.setInputFormat(RandomInputFormat.class);
+ job.setInputFormatClass(RandomInputFormat.class);
job.setMapperClass(RandomMapOutput.class);
- final ClusterStatus cluster = new JobClient(job).getClusterStatus();
- int numMapsPerHost = job.getInt("test.randomtextwrite.maps_per_host", 10);
+ Configuration conf = job.getConfiguration();
+ final ClusterStatus cluster = new JobClient(conf).getClusterStatus();
+ int numMapsPerHost = conf.getInt("test.randomtextwrite.maps_per_host", 10);
long numBytesToWritePerMap =
- job.getLong("test.randomtextwrite.bytes_per_map", 1*1024*1024*1024);
+ conf.getLong("test.randomtextwrite.bytes_per_map", 1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
throw new IOException(
"Cannot have test.randomtextwrite.bytes_per_map set to 0");
}
- long totalBytesToWrite = job.getLong("test.randomtextwrite.total_bytes",
+ long totalBytesToWrite = conf.getLong("test.randomtextwrite.total_bytes",
numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers());
int numMaps = (int)(totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
- job.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
+ conf.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
}
- job.setNumMapTasks(numMaps);
+ conf.setInt("mapred.map.tasks", numMaps);
}
// Sampling //
-
- static abstract class SampleMapReduceBase<K extends WritableComparable,
- V extends Writable>
- extends MapReduceBase {
+ static abstract class SampleMapBase<K extends WritableComparable<?>,
+ V extends Writable> extends Mapper<K, V, K, V> {
private long total;
private long kept = 0;
private float keep;
- protected void setKeep(float keep) {
- this.keep = keep;
- }
-
- protected void emit(K key, V val, OutputCollector<K,V> out)
- throws IOException {
+ public void setup(Context context) {
+ this.keep = context.getConfiguration().
+ getFloat("hadoop.sort.map.keep.percent", (float)100.0) / (float)100.0;
+ }
+
+ protected void emit(K key, V val, Context context)
+ throws IOException, InterruptedException {
++total;
while((float) kept / total < keep) {
++kept;
- out.collect(key, val);
+ context.write(key, val);
}
}
}
- public static class SampleMapper<K extends WritableComparable, V extends Writable>
- extends SampleMapReduceBase<K,V> implements Mapper<K,V,K,V> {
+ static abstract class SampleReduceBase<K extends WritableComparable<?>,
+ V extends Writable> extends Reducer<K, V, K, V> {
+ private long total;
+ private long kept = 0;
+ private float keep;
- public void configure(JobConf job) {
- setKeep(job.getFloat("hadoop.sort.map.keep.percent", (float)100.0) /
- (float)100.0);
+ public void setup(Context context) {
+ this.keep = context.getConfiguration().getFloat(
+ "hadoop.sort.reduce.keep.percent", (float)100.0) / (float)100.0;
}
- public void map(K key, V val,
- OutputCollector<K,V> output, Reporter reporter)
- throws IOException {
- emit(key, val, output);
+ protected void emit(K key, V val, Context context)
+ throws IOException, InterruptedException {
+ ++total;
+ while((float) kept / total < keep) {
+ ++kept;
+ context.write(key, val);
+ }
}
-
}
+
+ public static class SampleMapper<K extends WritableComparable<?>,
+ V extends Writable>
+ extends SampleMapBase<K,V> {
- public static class SampleReducer<K extends WritableComparable, V extends Writable>
- extends SampleMapReduceBase<K,V> implements Reducer<K,V,K,V> {
-
- public void configure(JobConf job) {
- setKeep(job.getFloat("hadoop.sort.reduce.keep.percent", (float)100.0) /
- (float)100.0);
+ public void map(K key, V val, Context context)
+ throws IOException, InterruptedException {
+ emit(key, val, context);
}
- public void reduce(K key, Iterator<V> values,
- OutputCollector<K,V> output, Reporter reporter)
- throws IOException {
- while (values.hasNext()) {
- emit(key, values.next(), output);
+ }
+
+ public static class SampleReducer<K extends WritableComparable<?>,
+ V extends Writable>
+ extends SampleReduceBase<K,V> {
+
+ public void reduce(K key, Iterable<V> values, Context context)
+ throws IOException, InterruptedException {
+ for (V value : values) {
+ emit(key, value, context);
}
}
@@ -373,9 +395,9 @@
* Obscures the InputFormat and location information to simulate maps
* reading input from arbitrary locations ("indirect" reads).
*/
- static class IndirectInputFormat implements InputFormat {
+ static class IndirectInputFormat<K, V> extends InputFormat<K, V> {
- static class IndirectSplit implements InputSplit {
+ static class IndirectSplit extends InputSplit {
Path file;
long len;
public IndirectSplit() { }
@@ -398,32 +420,34 @@
}
}
- public InputSplit[] getSplits(JobConf job, int numSplits)
+ public List<InputSplit> getSplits(JobContext job)
throws IOException {
- Path src = new Path(job.get("mapred.indirect.input.file", null));
- FileSystem fs = src.getFileSystem(job);
+ Configuration conf = job.getConfiguration();
+ Path src = new Path(conf.get("mapred.indirect.input.file", null));
+ FileSystem fs = src.getFileSystem(conf);
- ArrayList<IndirectSplit> splits = new ArrayList<IndirectSplit>(numSplits);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
LongWritable key = new LongWritable();
Text value = new Text();
- for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, job);
+ for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, conf);
sl.next(key, value);) {
splits.add(new IndirectSplit(new Path(value.toString()), key.get()));
}
- return splits.toArray(new IndirectSplit[splits.size()]);
+ return splits;
}
- public RecordReader getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
- InputFormat indirIF = (InputFormat)ReflectionUtils.newInstance(
- job.getClass("mapred.indirect.input.format",
- SequenceFileInputFormat.class), job);
+ @SuppressWarnings("unchecked")
+ public RecordReader<K, V> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ InputFormat<K, V> indirIF = (InputFormat)ReflectionUtils.newInstance(
+ conf.getClass("mapred.indirect.input.format",
+ SequenceFileInputFormat.class), conf);
IndirectSplit is = ((IndirectSplit)split);
- return indirIF.getRecordReader(new FileSplit(is.getPath(), 0,
- is.getLength(), (String[])null),
- job, reporter);
+ return indirIF.createRecordReader(new FileSplit(is.getPath(), 0,
+ is.getLength(), (String[])null), context);
}
}