You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by db...@apache.org on 2012/08/03 11:39:46 UTC
svn commit: r1368848 - in /mrunit/trunk/src:
main/java/org/apache/hadoop/mrunit/ main/java/org/apache/hadoop/mrunit/types/
test/java/org/apache/hadoop/mrunit/
test/java/org/apache/hadoop/mrunit/mapreduce/
Author: dbeech
Date: Fri Aug 3 09:39:45 2012
New Revision: 1368848
URL: http://svn.apache.org/viewvc?rev=1368848&view=rev
Log:
MRUNIT-127: Key grouping with GroupingComparators is not consistent with MapReduce behaviour
Modified:
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1368848&r1=1368847&r2=1368848&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Fri Aug 3 09:39:45 2012
@@ -21,9 +21,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -163,7 +164,22 @@ public abstract class MapReduceDriverBas
* @return the sorted list of (key, list(val))'s to present to the reducer
*/
public List<Pair<K2, List<V2>>> shuffle(final List<Pair<K2, V2>> mapOutputs) {
- // step 1 - use the key group comparator to organise map outputs
+
+ // sort the map outputs using the key order comparator (if set)
+ if (keyValueOrderComparator != null) {
+ final Comparator<Pair<K2, V2>> pairKeyComparator = new Comparator<Pair<K2, V2>>() {
+ @Override
+ public int compare(final Pair<K2, V2> o1, final Pair<K2, V2> o2) {
+ return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
+ }
+ };
+ Collections.sort(mapOutputs, pairKeyComparator);
+ }
+ else {
+ Collections.sort(mapOutputs, new Pair.FirstElemComparator());
+ }
+
+ // initialise grouping comparator
final Comparator<K2> keyGroupComparator;
if (this.keyGroupComparator == null) {
keyGroupComparator = new JobConf(getConfiguration())
@@ -171,39 +187,28 @@ public abstract class MapReduceDriverBas
} else {
keyGroupComparator = this.keyGroupComparator;
}
- final TreeMap<K2, List<Pair<K2, V2>>> groupedByKey = new TreeMap<K2, List<Pair<K2, V2>>>(
- keyGroupComparator);
-
- List<Pair<K2, V2>> groupedKeyList;
+
+ // apply grouping comparator to create groups
+ final Map<K2, List<Pair<K2, V2>>> groupedByKey =
+ new LinkedHashMap<K2, List<Pair<K2, V2>>>();
+
+ List<Pair<K2, V2>> groupedKeyList = null;
+ Pair<K2,V2> previous = null;
+
for (final Pair<K2, V2> mapOutput : mapOutputs) {
- groupedKeyList = groupedByKey.get(mapOutput.getFirst());
-
- if (groupedKeyList == null) {
+ if (previous == null || keyGroupComparator
+ .compare(previous.getFirst(), mapOutput.getFirst()) != 0) {
groupedKeyList = new ArrayList<Pair<K2, V2>>();
groupedByKey.put(mapOutput.getFirst(), groupedKeyList);
}
-
groupedKeyList.add(mapOutput);
+ previous = mapOutput;
}
- // step 2 - sort each key group using the key order comparator (if set)
- final Comparator<Pair<K2, V2>> pairKeyComparator = new Comparator<Pair<K2, V2>>() {
- @Override
- public int compare(final Pair<K2, V2> o1, final Pair<K2, V2> o2) {
- return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
- }
- };
-
- // create shuffle stage output list
- final List<Pair<K2, List<V2>>> outputKeyValuesList = new ArrayList<Pair<K2, List<V2>>>();
-
// populate output list
- for (final Entry<K2, List<Pair<K2, V2>>> groupedByKeyEntry : groupedByKey
- .entrySet()) {
- if (keyValueOrderComparator != null) {
- // sort the key/value pairs using the key order comparator (if set)
- Collections.sort(groupedByKeyEntry.getValue(), pairKeyComparator);
- }
+ final List<Pair<K2, List<V2>>> outputKeyValuesList = new ArrayList<Pair<K2, List<V2>>>();
+ for (final Entry<K2, List<Pair<K2, V2>>> groupedByKeyEntry :
+ groupedByKey.entrySet()) {
// create list to hold values for the grouped key
final List<V2> valuesList = new ArrayList<V2>();
@@ -216,7 +221,6 @@ public abstract class MapReduceDriverBas
groupedByKeyEntry.getKey(), valuesList));
}
- // return output list
return outputKeyValuesList;
}
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java?rev=1368848&r1=1368847&r2=1368848&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java Fri Aug 3 09:39:45 2012
@@ -69,8 +69,7 @@ public class Pair<S, T> implements Compa
return firstResult;
}
- // TODO: Can this be made static? Same with SecondElemComparator?
- public class FirstElemComparator implements Comparator<Pair<S, T>> {
+ public static class FirstElemComparator<S, T> implements Comparator<Pair<S, T>> {
public FirstElemComparator() {
}
@@ -80,7 +79,7 @@ public class Pair<S, T> implements Compa
}
}
- public class SecondElemComparator implements Comparator<Pair<S, T>> {
+ public static class SecondElemComparator<S, T> implements Comparator<Pair<S, T>> {
public SecondElemComparator() {
}
Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java?rev=1368848&r1=1368847&r2=1368848&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java Fri Aug 3 09:39:45 2012
@@ -279,7 +279,7 @@ public class TestMapReduceDriver {
/**
* group comparator - group by first character
*/
- public static class GroupComparator implements RawComparator<Text> {
+ public static class FirstCharComparator implements RawComparator<Text> {
@Override
public int compare(final Text o1, final Text o2) {
@@ -298,7 +298,7 @@ public class TestMapReduceDriver {
/**
* value order comparator - order by second character
*/
- public static class OrderComparator implements RawComparator<Text> {
+ public static class SecondCharComparator implements RawComparator<Text> {
@Override
public int compare(final Text o1, final Text o2) {
return o1.toString().substring(1, 2)
@@ -339,17 +339,17 @@ public class TestMapReduceDriver {
}
});
- driver.withKeyGroupingComparator(new GroupComparator());
- driver.withKeyOrderComparator(new OrderComparator());
+ driver.withKeyGroupingComparator(new FirstCharComparator());
+ driver.withKeyOrderComparator(new SecondCharComparator());
driver.addInput(new Text("a1"), new LongWritable(1));
driver.addInput(new Text("b1"), new LongWritable(1));
driver.addInput(new Text("a3"), new LongWritable(3));
driver.addInput(new Text("a2"), new LongWritable(2));
- driver.addOutput(new Text("a1"), new LongWritable(0x1 | (0x2 << 8)
- | (0x3 << 16)));
+ driver.addOutput(new Text("a1"), new LongWritable(0x1));
driver.addOutput(new Text("b1"), new LongWritable(0x1));
+ driver.addOutput(new Text("a2"), new LongWritable(0x2 | (0x3 << 8)));
driver.runTest();
}
@@ -580,4 +580,39 @@ public class TestMapReduceDriver {
assertNotNull(mapper.getMapInputPath());
assertEquals(mapInputPath.getName(), mapper.getMapInputPath().getName());
}
+
+ @Test
+ public void testGroupingComparatorBehaviour1() throws IOException {
+ driver.withInput(new Text("A1"),new LongWritable(1L))
+ .withInput(new Text("A2"),new LongWritable(1L))
+ .withInput(new Text("B1"),new LongWritable(1L))
+ .withInput(new Text("B2"),new LongWritable(1L))
+ .withInput(new Text("C1"),new LongWritable(1L))
+ .withOutput(new Text("A1"),new LongWritable(2L))
+ .withOutput(new Text("B1"),new LongWritable(2L))
+ .withOutput(new Text("C1"),new LongWritable(1L))
+ .withKeyGroupingComparator(new FirstCharComparator())
+ .runTest(false);
+ }
+
+ @Test
+ public void testGroupingComparatorBehaviour2() throws IOException {
+ // this test fails pre-MRUNIT-127 because of the incorrect
+ // grouping of reduce keys in "shuffle".
+ // MapReduce doesn't group keys which aren't in a contiguous
+ // range when sorted by their sorting comparator.
+ driver.withInput(new Text("1A"),new LongWritable(1L))
+ .withInput(new Text("2A"),new LongWritable(1L))
+ .withInput(new Text("1B"),new LongWritable(1L))
+ .withInput(new Text("2B"),new LongWritable(1L))
+ .withInput(new Text("1C"),new LongWritable(1L))
+ .withOutput(new Text("1A"),new LongWritable(1L))
+ .withOutput(new Text("2A"),new LongWritable(1L))
+ .withOutput(new Text("1B"),new LongWritable(1L))
+ .withOutput(new Text("2B"),new LongWritable(1L))
+ .withOutput(new Text("1C"),new LongWritable(1L))
+ .withKeyGroupingComparator(new SecondCharComparator())
+ .runTest(false);
+ }
+
}
Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java?rev=1368848&r1=1368847&r2=1368848&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java Fri Aug 3 09:39:45 2012
@@ -41,8 +41,8 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.mrunit.ExpectedSuppliedException;
-import org.apache.hadoop.mrunit.TestMapReduceDriver.GroupComparator;
-import org.apache.hadoop.mrunit.TestMapReduceDriver.OrderComparator;
+import org.apache.hadoop.mrunit.TestMapReduceDriver.FirstCharComparator;
+import org.apache.hadoop.mrunit.TestMapReduceDriver.SecondCharComparator;
import org.apache.hadoop.mrunit.mapreduce.TestMapDriver.ConfigurationMapper;
import org.apache.hadoop.mrunit.mapreduce.TestReduceDriver.ConfigurationReducer;
import org.apache.hadoop.mrunit.types.Pair;
@@ -282,17 +282,17 @@ public class TestMapReduceDriver {
}
});
- driver.withKeyGroupingComparator(new GroupComparator());
- driver.withKeyOrderComparator(new OrderComparator());
+ driver.withKeyGroupingComparator(new FirstCharComparator());
+ driver.withKeyOrderComparator(new SecondCharComparator());
driver.addInput(new Text("a1"), new LongWritable(1));
driver.addInput(new Text("b1"), new LongWritable(1));
driver.addInput(new Text("a3"), new LongWritable(3));
driver.addInput(new Text("a2"), new LongWritable(2));
- driver.addOutput(new Text("a1"), new LongWritable(0x1 | (0x2 << 8)
- | (0x3 << 16)));
+ driver.addOutput(new Text("a1"), new LongWritable(0x1));
driver.addOutput(new Text("b1"), new LongWritable(0x1));
+ driver.addOutput(new Text("a2"), new LongWritable(0x2 | (0x3 << 8)));
driver.runTest();
}
@@ -520,4 +520,39 @@ public class TestMapReduceDriver {
assertNotNull(mapper.getMapInputPath());
assertEquals(mapInputPath.getName(), mapper.getMapInputPath().getName());
}
+
+ @Test
+ public void testGroupingComparatorBehaviour1() throws IOException {
+ driver.withInput(new Text("A1"),new LongWritable(1L))
+ .withInput(new Text("A2"),new LongWritable(1L))
+ .withInput(new Text("B1"),new LongWritable(1L))
+ .withInput(new Text("B2"),new LongWritable(1L))
+ .withInput(new Text("C1"),new LongWritable(1L))
+ .withOutput(new Text("A1"),new LongWritable(2L))
+ .withOutput(new Text("B1"),new LongWritable(2L))
+ .withOutput(new Text("C1"),new LongWritable(1L))
+ .withKeyGroupingComparator(new FirstCharComparator())
+ .runTest(false);
+ }
+
+ @Test
+ public void testGroupingComparatorBehaviour2() throws IOException {
+ // this test fails pre-MRUNIT-127 because of the incorrect
+ // grouping of reduce keys in "shuffle".
+ // MapReduce doesn't group keys which aren't in a contiguous
+ // range when sorted by their sorting comparator.
+ driver.withInput(new Text("1A"),new LongWritable(1L))
+ .withInput(new Text("2A"),new LongWritable(1L))
+ .withInput(new Text("1B"),new LongWritable(1L))
+ .withInput(new Text("2B"),new LongWritable(1L))
+ .withInput(new Text("1C"),new LongWritable(1L))
+ .withOutput(new Text("1A"),new LongWritable(1L))
+ .withOutput(new Text("2A"),new LongWritable(1L))
+ .withOutput(new Text("1B"),new LongWritable(1L))
+ .withOutput(new Text("2B"),new LongWritable(1L))
+ .withOutput(new Text("1C"),new LongWritable(1L))
+ .withKeyGroupingComparator(new SecondCharComparator())
+ .runTest(false);
+ }
+
}