You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/04/25 22:48:18 UTC
svn commit: r937871 - in /hadoop/mapreduce/trunk: ./
src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/
src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/
src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/
src/contrib/mrunit/src/test...
Author: cdouglas
Date: Sun Apr 25 20:48:17 2010
New Revision: 937871
URL: http://svn.apache.org/viewvc?rev=937871&view=rev
Log:
MAPREDUCE-1570. Add grouping comparators to MRUnit. Contributed by Chris White
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java
hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Apr 25 20:48:17 2010
@@ -267,6 +267,9 @@ Trunk (unreleased changes)
MAPREDUCE-1304. Add a task counter tracking time spent in GC. (Aaron
Kimball via cdouglas)
+ MAPREDUCE-1570. Add grouping comparators to MRUnit. (Chris White via
+ cdouglas)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java Sun Apr 25 20:48:17 2010
@@ -21,6 +21,7 @@ package org.apache.hadoop.mrunit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,6 +29,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Mapper;
@@ -48,7 +50,7 @@ import org.apache.hadoop.mrunit.types.Pa
* If a combiner is specified, then it will be run exactly once after
* the Mapper and before the Reducer.
*/
-public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
+public class MapReduceDriver<K1, V1, K2 extends Comparable, V2, K3, V3>
extends MapReduceDriverBase<K1, V1, K2, V2, K3, V3> {
public static final Log LOG = LogFactory.getLog(MapReduceDriver.class);
@@ -288,4 +290,30 @@ public class MapReduceDriver<K1, V1, K2,
public String toString() {
return "MapReduceDriver (" + myMapper + ", " + myReducer + ")";
}
+
+ /**
+ * Identical to {@link #setKeyGroupingComparator(RawComparator)}, but with a
+ * fluent programming style
+ * @param groupingComparator Comparator to use in the shuffle stage for key
+ * grouping
+ * @return this
+ */
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withKeyGroupingComparator(
+ RawComparator<K2> groupingComparator) {
+ setKeyGroupingComparator(groupingComparator);
+ return this;
+ }
+
+ /**
+ * Identical to {@link #setKeyOrderComparator(RawComparator)}, but with a
+ * fluent programming style
+ * @param orderComparator Comparator to use in the shuffle stage for key
+ * value ordering
+ * @return this
+ */
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withKeyOrderComparator(
+ RawComparator<K2> orderComparator) {
+ setKeyOrderComparator(orderComparator);
+ return this;
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Sun Apr 25 20:48:17 2010
@@ -21,14 +21,16 @@ package org.apache.hadoop.mrunit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.TreeMap;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mrunit.types.Pair;
/**
@@ -41,15 +43,33 @@ import org.apache.hadoop.mrunit.types.Pa
* This is designed to handle a single (k, v)* -> (k, v)* case from the
* Mapper/Reducer pair, representing a single unit test.
*/
-public abstract class MapReduceDriverBase<K1, V1, K2, V2, K3, V3>
+public abstract class MapReduceDriverBase<K1, V1, K2 extends Comparable, V2, K3, V3>
extends TestDriver<K1, V1, K3, V3> {
public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
protected List<Pair<K1, V1>> inputList;
+
+ /** Key group comparator */
+ protected Comparator<K2> keyGroupComparator;
+
+ /** Key value order comparator */
+ protected Comparator<K2> keyValueOrderComparator;
public MapReduceDriverBase() {
inputList = new ArrayList<Pair<K1, V1>>();
+
+ // create a simple comparator for key grouping and ordering
+ Comparator<K2> simpleComparator = new Comparator<K2>() {
+ @Override
+ public int compare(K2 o1, K2 o2) {
+ return o1.compareTo(o2);
+ }
+ };
+
+ // assign simple key grouping and ordering comparator
+ keyGroupComparator = simpleComparator;
+ keyValueOrderComparator = null;
}
/**
@@ -158,38 +178,82 @@ public abstract class MapReduceDriverBas
* @return the sorted list of (key, list(val))'s to present to the reducer
*/
public List<Pair<K2, List<V2>>> shuffle(List<Pair<K2, V2>> mapOutputs) {
- HashMap<K2, List<V2>> reducerInputs = new HashMap<K2, List<V2>>();
-
- // step 1: condense all values with the same key into a list.
+ // step 1 - use the key group comparator to organise map outputs
+ final TreeMap<K2, List<Pair<K2,V2>>> groupedByKey =
+ new TreeMap<K2, List<Pair<K2,V2>>>(keyGroupComparator);
+
+ List<Pair<K2,V2>> groupedKeyList;
for (Pair<K2, V2> mapOutput : mapOutputs) {
- List<V2> valuesForKey = reducerInputs.get(mapOutput.getFirst());
-
- if (null == valuesForKey) {
- // this is the first (k, v) pair for this key. Add it to the list.
- valuesForKey = new ArrayList<V2>();
- valuesForKey.add(mapOutput.getSecond());
- reducerInputs.put(mapOutput.getFirst(), valuesForKey);
- } else {
- // add this value to the existing list for this key
- valuesForKey.add(mapOutput.getSecond());
+ groupedKeyList = groupedByKey.get(mapOutput.getFirst());
+
+ if (groupedKeyList == null) {
+ groupedKeyList = new ArrayList<Pair<K2,V2>>();
+ groupedByKey.put(mapOutput.getFirst(), groupedKeyList);
}
+
+ groupedKeyList.add(mapOutput);
}
-
- // build a list out of these (k, list(v)) pairs
- List<Pair<K2, List<V2>>> finalInputs = new ArrayList<Pair<K2, List<V2>>>();
- Set<Map.Entry<K2, List<V2>>> entries = reducerInputs.entrySet();
- for (Map.Entry<K2, List<V2>> entry : entries) {
- K2 key = entry.getKey();
- List<V2> vals = entry.getValue();
- finalInputs.add(new Pair<K2, List<V2>>(key, vals));
- }
-
- // and then sort the output list by key
- if (finalInputs.size() > 0) {
- Collections.sort(finalInputs,
- finalInputs.get(0).new FirstElemComparator());
- }
-
- return finalInputs;
+
+ // step 2 - sort each key group using the key order comparator (if set)
+ Comparator<Pair<K2,V2>> pairKeyComparator = new Comparator<Pair<K2, V2>>() {
+ @Override
+ public int compare(Pair<K2, V2> o1, Pair<K2, V2> o2) {
+ return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
+ }
+ };
+
+ // create shuffle stage output list
+ List<Pair<K2, List<V2>>> outputKeyValuesList =
+ new ArrayList<Pair<K2,List<V2>>>();
+
+ // populate output list
+ for (Entry<K2, List<Pair<K2, V2>>> groupedByKeyEntry :
+ groupedByKey.entrySet()) {
+ if (keyValueOrderComparator != null) {
+ // sort the key/value pairs using the key order comparator (if set)
+ Collections.sort(groupedByKeyEntry.getValue(), pairKeyComparator);
+ }
+
+ // create list to hold values for the grouped key
+ List<V2> valuesList = new ArrayList<V2>();
+ for (Pair<K2, V2> pair : groupedByKeyEntry.getValue()) {
+ valuesList.add(pair.getSecond());
+ }
+
+ // add key and values to output list
+ outputKeyValuesList.add(
+ new Pair<K2,List<V2>>(groupedByKeyEntry.getKey(), valuesList));
+ }
+
+ // return output list
+ return outputKeyValuesList;
+ }
+
+ /**
+ * Set the key grouping comparator, similar to calling the following API
+ * calls but passing a real instance rather than just the class:
+ * <UL>
+ * <LI>pre 0.20.1 API: {@link JobConf#setOutputValueGroupingComparator(Class)}
+ * <LI>0.20.1+ API: {@link Job#setGroupingComparatorClass(Class)}
+ * </UL>
+ * @param groupingComparator
+ */
+ public void setKeyGroupingComparator(
+ RawComparator<K2> groupingComparator) {
+ keyGroupComparator = groupingComparator;
+ }
+
+ /**
+ * Set the key value order comparator, similar to calling the following API
+ * calls but passing a real instance rather than just the class:
+ * <UL>
+ * <LI>pre 0.20.1 API: {@link JobConf#setOutputKeyComparatorClass(Class)}
+ * <LI>0.20.1+ API: {@link Job#setSortComparatorClass(Class)}
+ * </UL>
+ * @param orderComparator
+ */
+ public void setKeyOrderComparator(
+ RawComparator<K2> orderComparator) {
+ keyValueOrderComparator = orderComparator;
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Sun Apr 25 20:48:17 2010
@@ -21,6 +21,7 @@ package org.apache.hadoop.mrunit.mapredu
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Mapper;
@@ -46,7 +48,7 @@ import org.apache.hadoop.mrunit.types.Pa
* This is designed to handle a single (k, v)* -> (k, v)* case from the
* Mapper/Reducer pair, representing a single unit test.
*/
-public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
+public class MapReduceDriver<K1, V1, K2 extends Comparable, V2, K3, V3>
extends MapReduceDriverBase<K1, V1, K2, V2, K3, V3> {
public static final Log LOG = LogFactory.getLog(MapReduceDriver.class);
@@ -240,4 +242,30 @@ public class MapReduceDriver<K1, V1, K2,
setConfiguration(configuration);
return this;
}
+
+ /**
+ * Identical to {@link #setKeyGroupingComparator(RawComparator)}, but with a
+ * fluent programming style
+ * @param groupingComparator Comparator to use in the shuffle stage for key
+ * grouping
+ * @return this
+ */
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withKeyGroupingComparator(
+ RawComparator<K2> groupingComparator) {
+ setKeyGroupingComparator(groupingComparator);
+ return this;
+ }
+
+ /**
+ * Identical to {@link #setKeyOrderComparator(RawComparator)}, but with a
+ * fluent programming style
+ * @param orderComparator Comparator to use in the shuffle stage for key
+ * value ordering
+ * @return this
+ */
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withKeyOrderComparator(
+ RawComparator<K2> orderComparator) {
+ setKeyOrderComparator(orderComparator);
+ return this;
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java Sun Apr 25 20:48:17 2010
@@ -22,14 +22,20 @@ import static org.apache.hadoop.mrunit.t
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.LongSumReducer;
@@ -274,5 +280,74 @@ public class TestMapReduceDriver extends
.withOutput(new Text("foo"), new LongWritable(FOO_OUT))
.runTest();
}
+
+ // Test the key grouping and value ordering comparators
+ @Test
+ public void testComparators() {
+ // group comparator - group by first character
+ RawComparator groupComparator = new RawComparator() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ return o1.toString().substring(0, 1).compareTo(
+ o2.toString().substring(0, 1));
+ }
+
+ @Override
+ public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
+ int arg4, int arg5) {
+ throw new RuntimeException("Not implemented");
+ }
+ };
+
+ // value order comparator - order by second character
+ RawComparator orderComparator = new RawComparator() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ return o1.toString().substring(1, 2).compareTo(
+ o2.toString().substring(1, 2));
+ }
+
+ @Override
+ public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
+ int arg4, int arg5) {
+ throw new RuntimeException("Not implemented");
+ }
+ };
+
+ // reducer to track the order of the input values using bit shifting
+ driver.withReducer(new Reducer<Text, LongWritable, Text, LongWritable>() {
+ @Override
+ public void reduce(Text key, Iterator<LongWritable> values,
+ OutputCollector<Text, LongWritable> output, Reporter reporter)
+ throws IOException {
+ long outputValue = 0;
+ int count = 0;
+ while (values.hasNext()) {
+ outputValue |= (values.next().get() << (count++*8));
+ }
+
+ output.collect(key, new LongWritable(outputValue));
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {}
+ });
+
+ driver.withKeyGroupingComparator(groupComparator);
+ driver.withKeyOrderComparator(orderComparator);
+
+ driver.addInput(new Text("a1"), new LongWritable(1));
+ driver.addInput(new Text("b1"), new LongWritable(1));
+ driver.addInput(new Text("a3"), new LongWritable(3));
+ driver.addInput(new Text("a2"), new LongWritable(2));
+
+ driver.addOutput(new Text("a1"), new LongWritable(0x1 | (0x2 << 8) | (0x3 << 16)));
+ driver.addOutput(new Text("b1"), new LongWritable(0x1));
+
+ driver.runTest();
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java Sun Apr 25 20:48:17 2010
@@ -22,6 +22,7 @@ import static org.apache.hadoop.mrunit.t
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
@@ -29,9 +30,14 @@ import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.mrunit.mapreduce.TestMapDriver.ConfigurationMapper;
import org.apache.hadoop.mrunit.mapreduce.TestReduceDriver.ConfigurationReducer;
@@ -256,5 +262,65 @@ public class TestMapReduceDriver extends
assertEquals(reducer.setupConfiguration.get("TestKey"), "TestValue");
}
+ // Test the key grouping and value ordering comparators
+ @Test
+ public void testComparators() {
+ // group comparator - group by first character
+ RawComparator groupComparator = new RawComparator() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ return o1.toString().substring(0, 1).compareTo(
+ o2.toString().substring(0, 1));
+ }
+
+ @Override
+ public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
+ int arg4, int arg5) {
+ throw new RuntimeException("Not implemented");
+ }
+ };
+
+ // value order comparator - order by second character
+ RawComparator orderComparator = new RawComparator() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ return o1.toString().substring(1, 2).compareTo(
+ o2.toString().substring(1, 2));
+ }
+
+ @Override
+ public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
+ int arg4, int arg5) {
+ throw new RuntimeException("Not implemented");
+ }
+ };
+
+ // reducer to track the order of the input values using bit shifting
+ driver.withReducer(new Reducer<Text, LongWritable, Text, LongWritable>() {
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+ long outputValue = 0;
+ int count = 0;
+ for (LongWritable value : values) {
+ outputValue |= (value.get() << (count++*8));
+ }
+
+ context.write(key, new LongWritable(outputValue));
+ }
+ });
+
+ driver.withKeyGroupingComparator(groupComparator);
+ driver.withKeyOrderComparator(orderComparator);
+
+ driver.addInput(new Text("a1"), new LongWritable(1));
+ driver.addInput(new Text("b1"), new LongWritable(1));
+ driver.addInput(new Text("a3"), new LongWritable(3));
+ driver.addInput(new Text("a2"), new LongWritable(2));
+
+ driver.addOutput(new Text("a1"), new LongWritable(0x1 | (0x2 << 8) | (0x3 << 16)));
+ driver.addOutput(new Text("b1"), new LongWritable(0x1));
+
+ driver.runTest();
+ }
}