You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/11/17 03:16:00 UTC
svn commit: r881096 - in /hadoop/mapreduce/branches/branch-0.21: ./
src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Author: cdouglas
Date: Tue Nov 17 02:16:00 2009
New Revision: 881096
URL: http://svn.apache.org/viewvc?rev=881096&view=rev
Log:
MAPREDUCE-1147. Add map output counters to new API. Contributed by Amar Kamat
Added:
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java
- copied, changed from r835291, hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
Removed:
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=881096&r1=881095&r2=881096&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Tue Nov 17 02:16:00 2009
@@ -818,3 +818,6 @@
HDFS-596. Fix memory leak in hdfsFreeFileInfo() for libhdfs.
(Zhang Bingjun via dhruba)
+
+ MAPREDUCE-1147. Add map output counters to new API. (Amar Kamat via
+ cdouglas)
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java?rev=881096&r1=881095&r2=881096&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/MapTask.java Tue Nov 17 02:16:00 2009
@@ -493,6 +493,43 @@
}
}
+ private class NewDirectOutputCollector<K,V>
+ extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
+ private final org.apache.hadoop.mapreduce.RecordWriter out;
+
+ private final TaskReporter reporter;
+
+ private final Counters.Counter mapOutputRecordCounter;
+
+ @SuppressWarnings("unchecked")
+ NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
+ JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ this.reporter = reporter;
+ out = outputFormat.getRecordWriter(taskContext);
+ mapOutputRecordCounter =
+ reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void write(K key, V value)
+ throws IOException, InterruptedException {
+ reporter.progress();
+ out.write(key, value);
+ mapOutputRecordCounter.increment(1);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException,InterruptedException {
+ reporter.progress();
+ if (out != null) {
+ out.close(context);
+ }
+ }
+ }
+
private class NewOutputCollector<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
private final MapOutputCollector<K,V> collector;
@@ -579,7 +616,8 @@
// get an output object
if (job.getNumReduceTasks() == 0) {
- output = outputFormat.getRecordWriter(taskContext);
+ output =
+ new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
Copied: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java (from r835291, hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java?p2=hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java&p1=hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java&r1=835291&r2=881096&rev=881096&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java Tue Nov 17 02:16:00 2009
@@ -23,18 +23,24 @@
import java.io.Writer;
import java.io.BufferedWriter;
import java.io.IOException;
-
-import junit.framework.TestCase;
+import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+
/**
- * This is an wordcount application that tests the count of records
- * got spilled to disk. It generates simple text input files. Then
+ * This is an wordcount application that tests job counters.
+ * It generates simple text input files. Then
* runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
* and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
* and 1 reduce) and verifies counters. Wordcount application reads the
@@ -42,14 +48,22 @@
* is a locally sorted list of words and the count of how often they occurred.
*
*/
-public class TestSpilledRecordsCounter extends TestCase {
+public class TestJobCounters {
- private void validateCounters(Counters counter, long spillRecCnt) {
+ String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+ File.separator + "tmp")).toString().replace(' ', '+');
+
+ private void validateCounters(Counters counter, long spillRecCnt,
+ long mapInputRecords, long mapOutputRecords) {
// Check if the numer of Spilled Records is same as expected
- assertEquals(counter.findCounter(TaskCounter.SPILLED_RECORDS).
- getCounter(), spillRecCnt);
+ assertEquals(spillRecCnt,
+ counter.findCounter(TaskCounter.SPILLED_RECORDS).getCounter());
+ assertEquals(mapInputRecords,
+ counter.findCounter(TaskCounter.MAP_INPUT_RECORDS).getCounter());
+ assertEquals(mapOutputRecords,
+ counter.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter());
}
-
+
private void createWordsFile(File inpFile) throws Exception {
Writer out = new BufferedWriter(new FileWriter(inpFile));
try {
@@ -58,7 +72,8 @@
for (int i = 0; i < REPLICAS; i++) {
for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
- out.write("word" + j + " word" + (j+1) + " word" + (j+2) + " word" + (j+3) + '\n');
+ out.write("word" + j + " word" + (j+1) + " word" + (j+2)
+ + " word" + (j+3) + '\n');
}
}
} finally {
@@ -73,9 +88,10 @@
* @throws IOException When there is communication problems with the
* job tracker.
*/
- public void testSpillCounter() throws Exception {
- JobConf conf = new JobConf(TestSpilledRecordsCounter.class);
- conf.setJobName("wordcountSpilledRecordsCounter");
+ @Test
+ public void testOldJobWithMapAndReducers() throws Exception {
+ JobConf conf = new JobConf(TestJobCounters.class);
+ conf.setJobName("wordcount-map-reducers");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
@@ -93,42 +109,34 @@
conf.set(JobContext.MAP_SORT_RECORD_PERCENT, "0.05");
conf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.80");
-
- String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
- File.separator + "tmp"))
- .toString().replace(' ', '+');
- conf.set("test.build.data", TEST_ROOT_DIR);
- String IN_DIR = TEST_ROOT_DIR + File.separator +
- "spilledRecords.countertest" + File.separator +
- "genins" + File.separator;
- String OUT_DIR = TEST_ROOT_DIR + File.separator +
- "spilledRecords.countertest" + File.separator;
-
FileSystem fs = FileSystem.get(conf);
- Path testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
+ Path testDir = new Path(TEST_ROOT_DIR, "countertest");
+ conf.set("test.build.data", testDir.toString());
try {
- if (fs.exists(testdir)) {
- fs.delete(testdir, true);
+ if (fs.exists(testDir)) {
+ fs.delete(testDir, true);
}
- if (!fs.mkdirs(testdir)) {
- throw new IOException("Mkdirs failed to create " + testdir.toString());
+ if (!fs.mkdirs(testDir)) {
+ throw new IOException("Mkdirs failed to create " + testDir.toString());
}
- Path wordsIns = new Path(testdir, "genins");
+ String inDir = testDir + File.separator + "genins" + File.separator;
+ String outDir = testDir + File.separator;
+ Path wordsIns = new Path(inDir);
if (!fs.mkdirs(wordsIns)) {
throw new IOException("Mkdirs failed to create " + wordsIns.toString());
}
//create 3 input files each with 5*2k words
- File inpFile = new File(IN_DIR + "input5_2k_1");
+ File inpFile = new File(inDir + "input5_2k_1");
createWordsFile(inpFile);
- inpFile = new File(IN_DIR + "input5_2k_2");
+ inpFile = new File(inDir + "input5_2k_2");
createWordsFile(inpFile);
- inpFile = new File(IN_DIR + "input5_2k_3");
+ inpFile = new File(inDir + "input5_2k_3");
createWordsFile(inpFile);
- FileInputFormat.setInputPaths(conf, IN_DIR);
- Path outputPath1=new Path(OUT_DIR, "output5_2k_3");
+ FileInputFormat.setInputPaths(conf, inDir);
+ Path outputPath1 = new Path(outDir, "output5_2k_3");
FileOutputFormat.setOutputPath(conf, outputPath1);
RunningJob myJob = JobClient.runJob(conf);
@@ -145,13 +153,16 @@
// So 0 records spilled to disk in 3rd level)
// So total of 6k+4k=10k
// Total job counter will be 54k+10k = 64k
- validateCounters(c1, 64000);
+
+ //3 maps and 2.5k lines --- So total 7.5k map input records
+ //3 maps and 10k words in each --- So total of 30k map output recs
+ validateCounters(c1, 64000, 7500, 30000);
//create 4th input file each with 5*2k words and test with 4 maps
- inpFile = new File(IN_DIR + "input5_2k_4");
+ inpFile = new File(inDir + "input5_2k_4");
createWordsFile(inpFile);
conf.setNumMapTasks(4);
- Path outputPath2=new Path(OUT_DIR, "output5_2k_4");
+ Path outputPath2 = new Path(outDir, "output5_2k_4");
FileOutputFormat.setOutputPath(conf, outputPath2);
myJob = JobClient.runJob(conf);
@@ -168,12 +179,184 @@
// So 0 records spilled to disk in 3rd level)
// So total of 8k+8k=16k
// Total job counter will be 72k+16k = 88k
- validateCounters(c1, 88000);
+
+ // 4 maps and 2.5k words in each --- So 10k map input records
+ // 4 maps and 10k unique words --- So 40k map output records
+ validateCounters(c1, 88000, 10000, 40000);
+
+ // check for a map only job
+ conf.setNumReduceTasks(0);
+ Path outputPath3 = new Path(outDir, "output5_2k_5");
+ FileOutputFormat.setOutputPath(conf, outputPath3);
+
+ myJob = JobClient.runJob(conf);
+ c1 = myJob.getCounters();
+ // 4 maps and 2.5k words in each --- So 10k map input records
+ // 4 maps and 10k unique words --- So 40k map output records
+ validateCounters(c1, 0, 10000, 40000);
+ } finally {
+ //clean up the input and output files
+ if (fs.exists(testDir)) {
+ fs.delete(testDir, true);
+ }
+ }
+ }
+
+ public static class NewMapTokenizer
+ extends Mapper<Object, Text, Text, IntWritable> {
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ public void map(Object key, Text value, Context context)
+ throws IOException, InterruptedException {
+ StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ context.write(word, one);
+ }
+ }
+ }
+
+ public static class NewIdentityReducer
+ extends Reducer<Text, IntWritable, Text, IntWritable> {
+ private IntWritable result = new IntWritable();
+
+ public void reduce(Text key, Iterable<IntWritable> values,
+ Context context) throws IOException, InterruptedException {
+ int sum = 0;
+ for (IntWritable val : values) {
+ sum += val.get();
+ }
+ result.set(sum);
+ context.write(key, result);
+ }
+ }
+
+ /**
+ * The main driver for word count map/reduce program.
+ * Invoke this method to submit the map/reduce job.
+ * @throws IOException When there is communication problems with the
+ * job tracker.
+ */
+ @Test
+ public void testNewJobWithMapAndReducers() throws Exception {
+ JobConf conf = new JobConf(TestJobCounters.class);
+ conf.setInt(JobContext.IO_SORT_MB, 1);
+ conf.setInt(JobContext.IO_SORT_FACTOR, 2);
+ conf.set(JobContext.MAP_SORT_RECORD_PERCENT, "0.05");
+ conf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.80");
+
+ FileSystem fs = FileSystem.get(conf);
+ Path testDir = new Path(TEST_ROOT_DIR, "countertest2");
+ conf.set("test.build.data", testDir.toString());
+ try {
+ if (fs.exists(testDir)) {
+ fs.delete(testDir, true);
+ }
+ if (!fs.mkdirs(testDir)) {
+ throw new IOException("Mkdirs failed to create " + testDir.toString());
+ }
+
+ String inDir = testDir + File.separator + "genins" + File.separator;
+ Path wordsIns = new Path(inDir);
+ if (!fs.mkdirs(wordsIns)) {
+ throw new IOException("Mkdirs failed to create " + wordsIns.toString());
+ }
+ String outDir = testDir + File.separator;
+
+ //create 3 input files each with 5*2k words
+ File inpFile = new File(inDir + "input5_2k_1");
+ createWordsFile(inpFile);
+ inpFile = new File(inDir + "input5_2k_2");
+ createWordsFile(inpFile);
+ inpFile = new File(inDir + "input5_2k_3");
+ createWordsFile(inpFile);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ Path outputPath1 = new Path(outDir, "output5_2k_3");
+ FileOutputFormat.setOutputPath(conf, outputPath1);
+
+ Job job = new Job(conf);
+ job.setJobName("wordcount-map-reducers");
+
+ // the keys are words (strings)
+ job.setOutputKeyClass(Text.class);
+ // the values are counts (ints)
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(NewMapTokenizer.class);
+ job.setCombinerClass(NewIdentityReducer.class);
+ job.setReducerClass(NewIdentityReducer.class);
+
+ job.setNumReduceTasks(1);
+
+ job.waitForCompletion(false);
+
+ org.apache.hadoop.mapreduce.Counters c1 = job.getCounters();
+ // 3maps & in each map, 4 first level spills --- So total 12.
+ // spilled records count:
+ // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+ // 3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+ // So total 8k+8k+2k=18k
+ // For 3 Maps, total = 3*18=54k
+ // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
+ // So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
+ // 3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
+ // So 0 records spilled to disk in 3rd level)
+ // So total of 6k+4k=10k
+ // Total job counter will be 54k+10k = 64k
+
+ //3 maps and 2.5k lines --- So total 7.5k map input records
+ //3 maps and 10k words in each --- So total of 30k map output recs
+ validateCounters(Counters.downgrade(c1), 64000, 7500, 30000);
+
+ //create 4th input file each with 5*2k words and test with 4 maps
+ inpFile = new File(inDir + "input5_2k_4");
+ createWordsFile(inpFile);
+ JobConf newJobConf = new JobConf(job.getConfiguration());
+
+ Path outputPath2 = new Path(outDir, "output5_2k_4");
+
+ FileOutputFormat.setOutputPath(newJobConf, outputPath2);
+
+ Job newJob = new Job(newJobConf);
+ newJob.waitForCompletion(false);
+ c1 = newJob.getCounters();
+ // 4maps & in each map 4 first level spills --- So total 16.
+ // spilled records count:
+ // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+ // 3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+ // So total 8k+8k+2k=18k
+ // For 3 Maps, total = 4*18=72k
+ // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
+ // So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
+ // 3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
+ // So 0 records spilled to disk in 3rd level)
+ // So total of 8k+8k=16k
+ // Total job counter will be 72k+16k = 88k
+
+ // 4 maps and 2.5k words in each --- So 10k map input records
+ // 4 maps and 10k unique words --- So 40k map output records
+ validateCounters(Counters.downgrade(c1), 88000, 10000, 40000);
+
+ JobConf newJobConf2 = new JobConf(newJob.getConfiguration());
+
+ Path outputPath3 = new Path(outDir, "output5_2k_5");
+
+ FileOutputFormat.setOutputPath(newJobConf2, outputPath3);
+
+ Job newJob2 = new Job(newJobConf2);
+ newJob2.setNumReduceTasks(0);
+ newJob2.waitForCompletion(false);
+ c1 = newJob2.getCounters();
+ // 4 maps and 2.5k words in each --- So 10k map input records
+ // 4 maps and 10k unique words --- So 40k map output records
+ validateCounters(Counters.downgrade(c1), 0, 10000, 40000);
} finally {
//clean up the input and output files
- if (fs.exists(testdir)) {
- fs.delete(testdir, true);
+ if (fs.exists(testDir)) {
+ fs.delete(testDir, true);
}
}
}
-}
+}
\ No newline at end of file