You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/12/16 17:48:51 UTC
svn commit: r727085 - in /hadoop/core/branches/branch-0.20: CHANGES.txt
src/examples/org/apache/hadoop/examples/ExampleDriver.java
src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Author: omalley
Date: Tue Dec 16 08:48:51 2008
New Revision: 727085
URL: http://svn.apache.org/viewvc?rev=727085&view=rev
Log:
HADOOP-4545. Add example and test case of secondary sort for the reduce.
(omalley) Merge of -r 727080:727081 from trunk to 0.20.
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=727085&r1=727084&r2=727085&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Dec 16 08:48:51 2008
@@ -258,8 +258,9 @@
HADOOP-4728. Add a test exercising different namenode configurations.
(Boris Shkolnik via cdouglas)
- HADOOP-4807. Adds JobClient commands to get the active/blacklisted tracker names.
- Also adds commands to display running/completed task attempt IDs. (ddas)
+ HADOOP-4807. Adds JobClient commands to get the active/blacklisted tracker
+ names. Also adds commands to display running/completed task attempt IDs.
+ (ddas)
HADOOP-4699. Remove checksum validation from map output servlet. (cdouglas)
@@ -267,15 +268,19 @@
(Sanjay Radia via acmurthy)
HADOOP-3136. Fixed the default scheduler to assign multiple tasks to each
- tasktracker per heartbeat, when feasible. To ensure locality isn't hurt too
- badly, the scheudler will not assign more than one off-switch task per
+ tasktracker per heartbeat, when feasible. To ensure locality isn't hurt
+ too badly, the scheudler will not assign more than one off-switch task per
heartbeat. The heartbeat interval is also halved since the task-tracker is
- fixed to no longer send out heartbeats on each task completion. A slow-start
- for scheduling reduces is introduced to ensure that reduces aren't started
- till sufficient number of maps are done, else reduces of jobs whose maps
- aren't scheduled might swamp the cluster. (acmurthy)
+ fixed to no longer send out heartbeats on each task completion. A
+ slow-start for scheduling reduces is introduced to ensure that reduces
+ aren't started till sufficient number of maps are done, else reduces of
+ jobs whose maps aren't scheduled might swamp the cluster.
Configuration changes to mapred-default.xml:
add mapred.reduce.slowstart.completed.maps
+ (acmurthy)
+
+ HADOOP-4545. Add example and test case of secondary sort for the reduce.
+ (omalley)
OPTIMIZATIONS
Modified: hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=727085&r1=727084&r2=727085&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ hadoop/core/branches/branch-0.20/src/examples/org/apache/hadoop/examples/ExampleDriver.java Tue Dec 16 08:48:51 2008
@@ -51,6 +51,8 @@
pgd.addClass("pi", PiEstimator.class, "A map/reduce program that estimates Pi using monte-carlo method.");
pgd.addClass("pentomino", DistributedPentomino.class,
"A map/reduce tile laying program to find solutions to pentomino problems.");
+ pgd.addClass("secondarysort", SecondarySort.class,
+ "An example defining a secondary sort to the reduce.");
pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=727085&r1=727084&r2=727085&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Dec 16 08:48:51 2008
@@ -27,7 +27,11 @@
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SecondarySort;
import org.apache.hadoop.examples.WordCount;
+import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
+import org.apache.hadoop.examples.SecondarySort.FirstPartitioner;
+import org.apache.hadoop.examples.SecondarySort.IntPair;
import org.apache.hadoop.examples.WordCount.IntSumReducer;
import org.apache.hadoop.examples.WordCount.TokenizerMapper;
import org.apache.hadoop.fs.FileSystem;
@@ -81,25 +85,74 @@
try {
mr = new MiniMRCluster(2, "file:///", 3);
Configuration conf = mr.createJobConf();
- writeFile("in/part1", "this is a test\nof word count\n");
- writeFile("in/part2", "more test");
- Job job = new Job(conf, "word count");
- job.setJarByClass(WordCount.class);
- job.setMapperClass(TokenizerMapper.class);
- job.setCombinerClass(IntSumReducer.class);
- job.setReducerClass(IntSumReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
- FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
- assertTrue(job.waitForCompletion());
- String out = readFile("out/part-r-00000");
- System.out.println(out);
- assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n",
- out);
+ runWordCount(conf);
+ runSecondarySort(conf);
} finally {
if (mr != null) { mr.shutdown(); }
}
}
+
+ private void runWordCount(Configuration conf
+ ) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
+ localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+ localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+ writeFile("in/part1", "this is a test\nof word count\n");
+ writeFile("in/part2", "more test");
+ Job job = new Job(conf, "word count");
+ job.setJarByClass(WordCount.class);
+ job.setMapperClass(TokenizerMapper.class);
+ job.setCombinerClass(IntSumReducer.class);
+ job.setReducerClass(IntSumReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+ FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+ assertTrue(job.waitForCompletion());
+ String out = readFile("out/part-r-00000");
+ System.out.println(out);
+ assertEquals("a\t1\ncount\t1\nis\t1\nmore\t1\nof\t1\ntest\t2\nthis\t1\nword\t1\n",
+ out);
+ }
+
+ private void runSecondarySort(Configuration conf) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
+ localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+ localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+ writeFile("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
+ "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
+ Job job = new Job(conf, "word count");
+ job.setJarByClass(WordCount.class);
+ job.setMapperClass(SecondarySort.MapClass.class);
+ job.setReducerClass(SecondarySort.Reduce.class);
+ // group and partition by the first int in the pair
+ job.setPartitionerClass(FirstPartitioner.class);
+ job.setGroupingComparatorClass(FirstGroupingComparator.class);
+
+ // the map output is IntPair, IntWritable
+ job.setMapOutputKeyClass(IntPair.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ // the reduce output is Text, IntWritable
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+ FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+ assertTrue(job.waitForCompletion());
+ String out = readFile("out/part-r-00000");
+ assertEquals("------------------------------------------------\n" +
+ "-3\t23\n" +
+ "------------------------------------------------\n" +
+ "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
+ "------------------------------------------------\n" +
+ "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
+ "------------------------------------------------\n" +
+ "5\t10\n" +
+ "------------------------------------------------\n" +
+ "10\t20\n10\t25\n10\t30\n", out);
+ }
}