You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mc...@apache.org on 2006/03/05 05:40:20 UTC

svn commit: r383279 - /lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java

Author: mc
Date: Sat Mar  4 20:40:18 2006
New Revision: 383279

URL: http://svn.apache.org/viewcvs?rev=383279&view=rev
Log:

  The MapredLoadTest now has an extra step, to exercise
the case where we have multiple reduce tasks.

  It used to have two stages: one job that created a huge
file of numbers in random order, followed by a job that
would read that file and count the numbers.  If the final
count was correct, the test passed.

  Unfortunately, neither of these jobs had a reduce task
that was greater than 1.

  So now we've got three stages.  The first stage is
unchanged.  The second stage reads the big file, then
emits the answer key split into 10 parts, one for each
reduce task.  then a third stage merges those parts into
a final number count.  As before, if that final count 
is correct, all is well.


Modified:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java?rev=383279&r1=383278&r2=383279&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MapredLoadTest.java Sat Mar  4 20:40:18 2006
@@ -56,6 +56,29 @@
  *
  **********************************************************/
 public class MapredLoadTest {
+    /**
+     * The RandomGen Job does the actual work of creating
+     * a huge file of assorted numbers.  It receives instructions
+     * as to how many times each number should be counted.  Then
+     * it emits those numbers in a crazy order.
+     *
+     * The map() function takes a key/val pair that describes
+     * a value-to-be-emitted (the key) and how many times it 
+     * should be emitted (the value), aka "numtimes".  map() then
+     * emits a series of intermediate key/val pairs.  It emits
+     * 'numtimes' of these.  The key is a random number and the
+     * value is the 'value-to-be-emitted'.
+     *
+     * The system collates and merges these pairs according to
+     * the random number.  reduce() function takes in a key/value
+     * pair that consists of a crazy random number and a series
+     * of values that should be emitted.  The random number key
+     * is now dropped, and reduce() emits a pair for every intermediate value.
+     * The emitted key is an intermediate value.  The emitted value
+     * is just a blank string.  Thus, we've created a huge file
+     * of numbers in random order, but where each number appears
+     * as many times as we were instructed.
+     */
     static class RandomGenMapper implements Mapper {
         Random r = new Random();
         public void configure(JobConf job) {
@@ -69,10 +92,11 @@
                 out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
             }
         }
-				public void close() {
-				}
-
+        public void close() {
+        }
     }
+    /**
+     */
     static class RandomGenReducer implements Reducer {
         public void configure(JobConf job) {
         }
@@ -84,9 +108,26 @@
                 out.collect(new UTF8("" + val), new UTF8(""));
             }
         }
-				public void close() {
-				}
+        public void close() {
+        }
     }
+
+    /**
+     * The RandomCheck Job does a lot of our work.  It takes
+     * in a num/string keyspace, and transforms it into a
+     * key/count(int) keyspace.
+     *
+     * The map() function just emits a num/1 pair for every
+     * num/string input pair.
+     *
+     * The reduce() function sums up all the 1s that were
+     * emitted for a single key.  It then emits the key/total
+     * pair.
+     *
+     * This is used to regenerate the random number "answer key".
+     * Each key here is a random number, and the count is the
+     * number of times the number was emitted.
+     */
     static class RandomCheckMapper implements Mapper {
         public void configure(JobConf job) {
         }
@@ -97,9 +138,11 @@
 
             out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
         }
-				public void close() {
-				}
+        public void close() {
+        }
     }
+    /**
+     */
     static class RandomCheckReducer implements Reducer {
         public void configure(JobConf job) {
         }
@@ -113,8 +156,45 @@
             }
             out.collect(new IntWritable(keyint), new IntWritable(count));
         }
-			public void close() {
-			}
+        public void close() {
+        }
+    }
+
+    /**
+     * The Merge Job is a really simple one.  It takes in
+     * an int/int key-value set, and emits the same set.
+     * But it merges identical keys by adding their values.
+     *
+     * Thus, the map() function is just the identity function
+     * and reduce() just sums.  Nothing to see here!
+     */
+    static class MergeMapper implements Mapper {
+        public void configure(JobConf job) {
+        }
+
+        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+            int keyint = ((IntWritable) key).get();
+            int valint = ((IntWritable) val).get();
+
+            out.collect(new IntWritable(keyint), new IntWritable(valint));
+        }
+        public void close() {
+        }
+    }
+    static class MergeReducer implements Reducer {
+        public void configure(JobConf job) {
+        }
+        
+        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+            int keyint = ((IntWritable) key).get();
+            int total = 0;
+            while (it.hasNext()) {
+                total += ((IntWritable) it.next()).get();
+            }
+            out.collect(new IntWritable(keyint), new IntWritable(total));
+        }
+        public void close() {
+        }
     }
 
     int range;
@@ -209,7 +289,8 @@
 
         //
         // Next, we read the big file in and regenerate the 
-        // original map.
+        // original map.  It's split into a number of parts.
+        // (That number is 'intermediateReduces'.)
         //
         // We have many map tasks, each of which read at least one
         // of the output numbers.  For each number read in, the
@@ -226,10 +307,17 @@
         // is the number in question, and the value is the number of
         // times the key was emitted.  This is the same format as the
         // original answer key (except that numbers emitted zero times
-        // will not appear in the regenerated key.)
-        //
-        File finalOuts = new File(testdir, "finalouts");
-        fs.mkdirs(finalOuts);
+        // will not appear in the regenerated key.)  The answer set
+        // is split into a number of pieces.  A final MapReduce job
+        // will merge them.
+        //
+        // There's not really a need to go to 10 reduces here 
+        // instead of 1.  But we want to test what happens when
+        // you have multiple reduces at once.
+        //
+        int intermediateReduces = 10;
+        File intermediateOuts = new File(testdir, "intermediateouts");
+        fs.mkdirs(intermediateOuts);
         JobConf checkJob = new JobConf(conf);
         checkJob.setInputDir(randomOuts);
         checkJob.setInputKeyClass(LongWritable.class);
@@ -237,15 +325,41 @@
         checkJob.setInputFormat(TextInputFormat.class);
         checkJob.setMapperClass(RandomCheckMapper.class);
 
-        checkJob.setOutputDir(finalOuts);
+        checkJob.setOutputDir(intermediateOuts);
         checkJob.setOutputKeyClass(IntWritable.class);
         checkJob.setOutputValueClass(IntWritable.class);
         checkJob.setOutputFormat(SequenceFileOutputFormat.class);
         checkJob.setReducerClass(RandomCheckReducer.class);
-        checkJob.setNumReduceTasks(1);
+        checkJob.setNumReduceTasks(intermediateReduces);
 
         JobClient.runJob(checkJob);
 
+        //
+        // OK, now we take the output from the last job and
+        // merge it down to a single file.  The map() and reduce()
+        // functions don't really do anything except reemit tuples.
+        // But by having a single reduce task here, we end up merging
+        // all the files.
+        //
+        File finalOuts = new File(testdir, "finalouts");        
+        fs.mkdirs(finalOuts);
+        JobConf mergeJob = new JobConf(conf);
+        mergeJob.setInputDir(intermediateOuts);
+        mergeJob.setInputKeyClass(IntWritable.class);
+        mergeJob.setInputValueClass(IntWritable.class);
+        mergeJob.setInputFormat(SequenceFileInputFormat.class);
+        mergeJob.setMapperClass(MergeMapper.class);
+        
+        mergeJob.setOutputDir(finalOuts);
+        mergeJob.setOutputKeyClass(IntWritable.class);
+        mergeJob.setOutputValueClass(IntWritable.class);
+        mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
+        mergeJob.setReducerClass(MergeReducer.class);
+        mergeJob.setNumReduceTasks(1);
+        
+        JobClient.runJob(mergeJob);
+        
+ 
         //
         // Finally, we compare the reconstructed answer key with the
         // original one.  Remember, we need to ignore zero-count items