You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/12/16 17:48:51 UTC

svn commit: r727085 - in /hadoop/core/branches/branch-0.20: CHANGES.txt src/examples/org/apache/hadoop/examples/ExampleDriver.java src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Author: omalley
Date: Tue Dec 16 08:48:51 2008
New Revision: 727085

URL: http://svn.apache.org/viewvc?rev=727085&view=rev
Log:
HADOOP-4545. Add example and test case of secondary sort for the reduce.
(omalley) Merge of -r 727080:727081 from trunk to 0.20.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=727085&r1=727084&r2=727085&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Dec 16 08:48:51 2008
@@ -258,8 +258,9 @@
     HADOOP-4728. Add a test exercising different namenode configurations.
     (Boris Shkolnik via cdouglas)
 
-    HADOOP-4807. Adds JobClient commands to get the active/blacklisted tracker names.
-    Also adds commands to display running/completed task attempt IDs. (ddas)
+    HADOOP-4807. Adds JobClient commands to get the active/blacklisted tracker
+    names. Also adds commands to display running/completed task attempt IDs. 
+    (ddas)
 
     HADOOP-4699. Remove checksum validation from map output servlet. (cdouglas)
 
@@ -267,15 +268,19 @@
     (Sanjay Radia via acmurthy) 
 
     HADOOP-3136. Fixed the default scheduler to assign multiple tasks to each 
-    tasktracker per heartbeat, when feasible. To ensure locality isn't hurt too 
-    badly, the scheudler will not assign more than one off-switch task per 
+    tasktracker per heartbeat, when feasible. To ensure locality isn't hurt 
+    too badly, the scheudler will not assign more than one off-switch task per 
     heartbeat. The heartbeat interval is also halved since the task-tracker is 
-    fixed to no longer send out heartbeats on each task completion. A slow-start
-    for scheduling reduces is introduced to ensure that reduces aren't started 
-    till sufficient number of maps are done, else reduces of jobs whose maps 
-    aren't scheduled might swamp the cluster. (acmurthy)
+    fixed to no longer send out heartbeats on each task completion. A 
+    slow-start for scheduling reduces is introduced to ensure that reduces 
+    aren't started till sufficient number of maps are done, else reduces of 
+    jobs whose maps aren't scheduled might swamp the cluster.
     Configuration changes to mapred-default.xml:
       add mapred.reduce.slowstart.completed.maps 
+    (acmurthy)
+
+    HADOOP-4545. Add example and test case of secondary sort for the reduce.
+    (omalley)
 
   OPTIMIZATIONS
 

Modified: hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=727085&r1=727084&r2=727085&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java Tue Dec 16 08:48:51 2008
@@ -51,6 +51,8 @@
       pgd.addClass("pi", PiEstimator.class, "A map/reduce program that estimates Pi using monte-carlo method.");
       pgd.addClass("pentomino", DistributedPentomino.class,
       "A map/reduce tile laying program to find solutions to pentomino problems.");
+      pgd.addClass("secondarysort", SecondarySort.class,
+                   "An example defining a secondary sort to the reduce.");
       pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
       pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
       pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");

Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=727085&r1=727084&r2=727085&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Dec 16 08:48:51 2008
@@ -27,7 +27,11 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SecondarySort;
 import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
+import org.apache.hadoop.examples.SecondarySort.FirstPartitioner;
+import org.apache.hadoop.examples.SecondarySort.IntPair;
 import org.apache.hadoop.examples.WordCount.IntSumReducer;
 import org.apache.hadoop.examples.WordCount.TokenizerMapper;
 import org.apache.hadoop.fs.FileSystem;
@@ -81,25 +85,74 @@
     try {
       mr = new MiniMRCluster(2, "file:///", 3);
       Configuration conf = mr.createJobConf();
-      writeFile("in/part1", "this is a test\nof word count\n");
-      writeFile("in/part2", "more test");
-      Job job = new Job(conf, "word count");     
-      job.setJarByClass(WordCount.class);
-      job.setMapperClass(TokenizerMapper.class);
-      job.setCombinerClass(IntSumReducer.class);
-      job.setReducerClass(IntSumReducer.class);
-      job.setOutputKeyClass(Text.class);
-      job.setOutputValueClass(IntWritable.class);
-      FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-      FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
-      assertTrue(job.waitForCompletion());
-      String out = readFile("out/part-r-00000");
-      System.out.println(out);
-      assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n",
-                   out);
+      runWordCount(conf);
+      runSecondarySort(conf);
     } finally {
       if (mr != null) { mr.shutdown(); }
     }
   }
+
+  private void runWordCount(Configuration conf
+                            ) throws IOException,
+                                     InterruptedException,
+                                     ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);    
+    writeFile("in/part1", "this is a test\nof word count\n");
+    writeFile("in/part2", "more test");
+    Job job = new Job(conf, "word count");     
+    job.setJarByClass(WordCount.class);
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+    assertTrue(job.waitForCompletion());
+    String out = readFile("out/part-r-00000");
+    System.out.println(out);
+    assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n",
+                 out);
+  }
+
+  private void runSecondarySort(Configuration conf) throws IOException,
+                                                        InterruptedException,
+                                                        ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    writeFile("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
+              "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
+    Job job = new Job(conf, "word count");     
+    job.setJarByClass(WordCount.class);
+    job.setMapperClass(SecondarySort.MapClass.class);
+    job.setReducerClass(SecondarySort.Reduce.class);
+    // group and partition by the first int in the pair
+    job.setPartitionerClass(FirstPartitioner.class);
+    job.setGroupingComparatorClass(FirstGroupingComparator.class);
+
+    // the map output is IntPair, IntWritable
+    job.setMapOutputKeyClass(IntPair.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    // the reduce output is Text, IntWritable
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+    assertTrue(job.waitForCompletion());
+    String out = readFile("out/part-r-00000");
+    assertEquals("------------------------------------------------\n" +
+                 "-3\t23\n" +
+                 "------------------------------------------------\n" +
+                 "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
+                 "------------------------------------------------\n" +
+                 "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
+                 "------------------------------------------------\n" +
+                 "5\t10\n" +
+                 "------------------------------------------------\n" +
+                 "10\t20\n10\t25\n10\t30\n", out);
+  }
   
 }