You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/11/17 03:15:49 UTC

svn commit: r881095 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java

Author: cdouglas
Date: Tue Nov 17 02:15:49 2009
New Revision: 881095

URL: http://svn.apache.org/viewvc?rev=881095&view=rev
Log:
MAPREDUCE-1147. Add map output counters to new API. Contributed by Amar Kamat

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java
      - copied, changed from r835291, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
Removed:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=881095&r1=881094&r2=881095&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Nov 17 02:15:49 2009
@@ -885,3 +885,6 @@
 
     MAPREDUCE-1068. Fix streaming job to show proper message if file is 
     is not present. (Amareshwari Sriramadasu via sharad)
+
+    MAPREDUCE-1147. Add map output counters to new API. (Amar Kamat via
+    cdouglas)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=881095&r1=881094&r2=881095&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Nov 17 02:15:49 2009
@@ -493,6 +493,43 @@
     }
   }
 
+  private class NewDirectOutputCollector<K,V>
+  extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
+    private final org.apache.hadoop.mapreduce.RecordWriter out;
+
+    private final TaskReporter reporter;
+
+    private final Counters.Counter mapOutputRecordCounter;
+    
+    @SuppressWarnings("unchecked")
+    NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
+        JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
+    throws IOException, ClassNotFoundException, InterruptedException {
+      this.reporter = reporter;
+      out = outputFormat.getRecordWriter(taskContext);
+      mapOutputRecordCounter = 
+        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void write(K key, V value) 
+    throws IOException, InterruptedException {
+      reporter.progress();
+      out.write(key, value);
+      mapOutputRecordCounter.increment(1);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) 
+    throws IOException,InterruptedException {
+      reporter.progress();
+      if (out != null) {
+        out.close(context);
+      }
+    }
+  }
+  
   private class NewOutputCollector<K,V>
     extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     private final MapOutputCollector<K,V> collector;
@@ -579,7 +616,8 @@
     
     // get an output object
     if (job.getNumReduceTasks() == 0) {
-      output = outputFormat.getRecordWriter(taskContext);
+      output = 
+        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
     } else {
       output = new NewOutputCollector(taskContext, job, umbilical, reporter);
     }

Copied: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java (from r835291, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java?p2=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java&p1=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java&r1=835291&r2=881095&rev=881095&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java Tue Nov 17 02:15:49 2009
@@ -23,18 +23,24 @@
 import java.io.Writer;
 import java.io.BufferedWriter;
 import java.io.IOException;
-
-import junit.framework.TestCase;
+import java.util.StringTokenizer;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskCounter;
 
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+
 /**
- * This is an wordcount application that tests the count of records
- * got spilled to disk. It generates simple text input files. Then
+ * This is an wordcount application that tests job counters.
+ * It generates simple text input files. Then
  * runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
  * and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
  * and 1 reduce) and verifies counters. Wordcount application reads the
@@ -42,14 +48,22 @@
  * is a locally sorted list of words and the count of how often they occurred.
  *
  */
-public class TestSpilledRecordsCounter extends TestCase {
+public class TestJobCounters {
 
-  private void validateCounters(Counters counter, long spillRecCnt) {
+  String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+                          File.separator + "tmp")).toString().replace(' ', '+');
+	
+  private void validateCounters(Counters counter, long spillRecCnt, 
+                                long mapInputRecords, long mapOutputRecords) {
       // Check if the numer of Spilled Records is same as expected
-      assertEquals(counter.findCounter(TaskCounter.SPILLED_RECORDS).
-                     getCounter(), spillRecCnt);
+      assertEquals(spillRecCnt,
+    		  counter.findCounter(TaskCounter.SPILLED_RECORDS).getCounter());
+      assertEquals(mapInputRecords,
+    		  counter.findCounter(TaskCounter.MAP_INPUT_RECORDS).getCounter());
+      assertEquals(mapOutputRecords, 
+    		  counter.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter());
   }
-
+  
   private void createWordsFile(File inpFile) throws Exception {
     Writer out = new BufferedWriter(new FileWriter(inpFile));
     try {
@@ -58,7 +72,8 @@
 
       for (int i = 0; i < REPLICAS; i++) {
         for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
-          out.write("word" + j + " word" + (j+1) + " word" + (j+2) + " word" + (j+3) + '\n');
+          out.write("word" + j + " word" + (j+1) + " word" + (j+2) 
+                    + " word" + (j+3) + '\n');
         }
       }
     } finally {
@@ -73,9 +88,10 @@
    * @throws IOException When there is communication problems with the
    *                     job tracker.
    */
-  public void testSpillCounter() throws Exception {
-    JobConf conf = new JobConf(TestSpilledRecordsCounter.class);
-    conf.setJobName("wordcountSpilledRecordsCounter");
+  @Test
+  public void testOldJobWithMapAndReducers() throws Exception {
+    JobConf conf = new JobConf(TestJobCounters.class);
+    conf.setJobName("wordcount-map-reducers");
 
     // the keys are words (strings)
     conf.setOutputKeyClass(Text.class);
@@ -93,42 +109,34 @@
     conf.set(JobContext.MAP_SORT_RECORD_PERCENT, "0.05");
     conf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.80");
 
-
-    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
-                                      File.separator + "tmp"))
-                               .toString().replace(' ', '+');
-    conf.set("test.build.data", TEST_ROOT_DIR);
-    String IN_DIR = TEST_ROOT_DIR + File.separator +
-                      "spilledRecords.countertest" +  File.separator +
-                      "genins" + File.separator;
-    String OUT_DIR = TEST_ROOT_DIR + File.separator +
-                      "spilledRecords.countertest" + File.separator;
-
     FileSystem fs = FileSystem.get(conf);
-    Path testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
+    Path testDir = new Path(TEST_ROOT_DIR, "countertest");
+    conf.set("test.build.data", testDir.toString());
     try {
-      if (fs.exists(testdir)) {
-        fs.delete(testdir, true);
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
       }
-      if (!fs.mkdirs(testdir)) {
-        throw new IOException("Mkdirs failed to create " + testdir.toString());
+      if (!fs.mkdirs(testDir)) {
+        throw new IOException("Mkdirs failed to create " + testDir.toString());
       }
 
-      Path wordsIns = new Path(testdir, "genins");
+      String inDir = testDir +  File.separator + "genins" + File.separator;
+      String outDir = testDir + File.separator;
+      Path wordsIns = new Path(inDir);
       if (!fs.mkdirs(wordsIns)) {
         throw new IOException("Mkdirs failed to create " + wordsIns.toString());
       }
 
       //create 3 input files each with 5*2k words
-      File inpFile = new File(IN_DIR + "input5_2k_1");
+      File inpFile = new File(inDir + "input5_2k_1");
       createWordsFile(inpFile);
-      inpFile = new File(IN_DIR + "input5_2k_2");
+      inpFile = new File(inDir + "input5_2k_2");
       createWordsFile(inpFile);
-      inpFile = new File(IN_DIR + "input5_2k_3");
+      inpFile = new File(inDir + "input5_2k_3");
       createWordsFile(inpFile);
 
-      FileInputFormat.setInputPaths(conf, IN_DIR);
-      Path outputPath1=new Path(OUT_DIR, "output5_2k_3");
+      FileInputFormat.setInputPaths(conf, inDir);
+      Path outputPath1 = new Path(outDir, "output5_2k_3");
       FileOutputFormat.setOutputPath(conf, outputPath1);
 
       RunningJob myJob = JobClient.runJob(conf);
@@ -145,13 +153,16 @@
       //         So 0 records spilled to disk in 3rd level)
       //         So total of 6k+4k=10k
       // Total job counter will be 54k+10k = 64k
-      validateCounters(c1, 64000);
+      
+      //3 maps and 2.5k lines --- So total 7.5k map input records
+      //3 maps and 10k words in each --- So total of 30k map output recs
+      validateCounters(c1, 64000, 7500, 30000);
 
       //create 4th input file each with 5*2k words and test with 4 maps
-      inpFile = new File(IN_DIR + "input5_2k_4");
+      inpFile = new File(inDir + "input5_2k_4");
       createWordsFile(inpFile);
       conf.setNumMapTasks(4);
-      Path outputPath2=new Path(OUT_DIR, "output5_2k_4");
+      Path outputPath2 = new Path(outDir, "output5_2k_4");
       FileOutputFormat.setOutputPath(conf, outputPath2);
 
       myJob = JobClient.runJob(conf);
@@ -168,12 +179,184 @@
       //         So 0 records spilled to disk in 3rd level)
       //         So total of 8k+8k=16k
       // Total job counter will be 72k+16k = 88k
-      validateCounters(c1, 88000);
+      
+      // 4 maps and 2.5k words in each --- So 10k map input records
+      // 4 maps and 10k unique words --- So 40k map output records
+      validateCounters(c1, 88000, 10000, 40000);
+      
+      // check for a map only job
+      conf.setNumReduceTasks(0);
+      Path outputPath3 = new Path(outDir, "output5_2k_5");
+      FileOutputFormat.setOutputPath(conf, outputPath3);
+
+      myJob = JobClient.runJob(conf);
+      c1 = myJob.getCounters();
+      // 4 maps and 2.5k words in each --- So 10k map input records
+      // 4 maps and 10k unique words --- So 40k map output records
+      validateCounters(c1, 0, 10000, 40000);
+    } finally {
+      //clean up the input and output files
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
+      }
+    }
+  }
+  
+  public static class NewMapTokenizer 
+  extends Mapper<Object, Text, Text, IntWritable> {
+	private final static IntWritable one = new IntWritable(1);
+	private Text word = new Text();
+
+	public void map(Object key, Text value, Context context) 
+	throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+  
+  public static class NewIdentityReducer  
+  extends Reducer<Text, IntWritable, Text, IntWritable> {
+  private IntWritable result = new IntWritable();
+  
+  public void reduce(Text key, Iterable<IntWritable> values, 
+                     Context context) throws IOException, InterruptedException {
+    int sum = 0;
+    for (IntWritable val : values) {
+      sum += val.get();
+    }
+    result.set(sum);
+    context.write(key, result);
+  }
+ }
+  
+  /**
+   * The main driver for word count map/reduce program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the
+   *                     job tracker.
+   */
+  @Test
+  public void testNewJobWithMapAndReducers() throws Exception {
+    JobConf conf = new JobConf(TestJobCounters.class);
+    conf.setInt(JobContext.IO_SORT_MB, 1);
+    conf.setInt(JobContext.IO_SORT_FACTOR, 2);
+    conf.set(JobContext.MAP_SORT_RECORD_PERCENT, "0.05");
+    conf.set(JobContext.MAP_SORT_SPILL_PERCENT, "0.80");
+
+    FileSystem fs = FileSystem.get(conf);
+    Path testDir = new Path(TEST_ROOT_DIR, "countertest2");
+    conf.set("test.build.data", testDir.toString());
+    try {
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
+      }
+      if (!fs.mkdirs(testDir)) {
+        throw new IOException("Mkdirs failed to create " + testDir.toString());
+      }
+
+      String inDir = testDir +  File.separator + "genins" + File.separator;
+      Path wordsIns = new Path(inDir);
+      if (!fs.mkdirs(wordsIns)) {
+        throw new IOException("Mkdirs failed to create " + wordsIns.toString());
+      }
+      String outDir = testDir + File.separator;
+
+      //create 3 input files each with 5*2k words
+      File inpFile = new File(inDir + "input5_2k_1");
+      createWordsFile(inpFile);
+      inpFile = new File(inDir + "input5_2k_2");
+      createWordsFile(inpFile);
+      inpFile = new File(inDir + "input5_2k_3");
+      createWordsFile(inpFile);
+
+      FileInputFormat.setInputPaths(conf, inDir);
+      Path outputPath1 = new Path(outDir, "output5_2k_3");
+      FileOutputFormat.setOutputPath(conf, outputPath1);
+      
+      Job job = new Job(conf);
+      job.setJobName("wordcount-map-reducers");
+
+      // the keys are words (strings)
+      job.setOutputKeyClass(Text.class);
+      // the values are counts (ints)
+      job.setOutputValueClass(IntWritable.class);
+
+      job.setMapperClass(NewMapTokenizer.class);
+      job.setCombinerClass(NewIdentityReducer.class);
+      job.setReducerClass(NewIdentityReducer.class);
+
+      job.setNumReduceTasks(1);
+
+      job.waitForCompletion(false);
+      
+      org.apache.hadoop.mapreduce.Counters c1 = job.getCounters();
+      // 3maps & in each map, 4 first level spills --- So total 12.
+      // spilled records count:
+      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+      //           So total 8k+8k+2k=18k
+      // For 3 Maps, total = 3*18=54k
+      // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
+      //         So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
+      //         3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
+      //         So 0 records spilled to disk in 3rd level)
+      //         So total of 6k+4k=10k
+      // Total job counter will be 54k+10k = 64k
+      
+      //3 maps and 2.5k lines --- So total 7.5k map input records
+      //3 maps and 10k words in each --- So total of 30k map output recs
+      validateCounters(Counters.downgrade(c1), 64000, 7500, 30000);
+
+      //create 4th input file each with 5*2k words and test with 4 maps
+      inpFile = new File(inDir + "input5_2k_4");
+      createWordsFile(inpFile);
+      JobConf newJobConf = new JobConf(job.getConfiguration());
+      
+      Path outputPath2 = new Path(outDir, "output5_2k_4");
+      
+      FileOutputFormat.setOutputPath(newJobConf, outputPath2);
+
+      Job newJob = new Job(newJobConf);
+      newJob.waitForCompletion(false);
+      c1 = newJob.getCounters();
+      // 4maps & in each map 4 first level spills --- So total 16.
+      // spilled records count:
+      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+      //           So total 8k+8k+2k=18k
+      // For 3 Maps, total = 4*18=72k
+      // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
+      //         So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
+      //         3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
+      //         So 0 records spilled to disk in 3rd level)
+      //         So total of 8k+8k=16k
+      // Total job counter will be 72k+16k = 88k
+      
+      // 4 maps and 2.5k words in each --- So 10k map input records
+      // 4 maps and 10k unique words --- So 40k map output records
+      validateCounters(Counters.downgrade(c1), 88000, 10000, 40000);
+      
+      JobConf newJobConf2 = new JobConf(newJob.getConfiguration());
+      
+      Path outputPath3 = new Path(outDir, "output5_2k_5");
+      
+      FileOutputFormat.setOutputPath(newJobConf2, outputPath3);
+
+      Job newJob2 = new Job(newJobConf2);
+      newJob2.setNumReduceTasks(0);
+      newJob2.waitForCompletion(false);
+      c1 = newJob2.getCounters();
+      // 4 maps and 2.5k words in each --- So 10k map input records
+      // 4 maps and 10k unique words --- So 40k map output records
+      validateCounters(Counters.downgrade(c1), 0, 10000, 40000);
     } finally {
       //clean up the input and output files
-      if (fs.exists(testdir)) {
-        fs.delete(testdir, true);
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
       }
     }
   }
-}
+}
\ No newline at end of file