You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by db...@apache.org on 2012/08/03 11:39:46 UTC

svn commit: r1368848 - in /mrunit/trunk/src: main/java/org/apache/hadoop/mrunit/ main/java/org/apache/hadoop/mrunit/types/ test/java/org/apache/hadoop/mrunit/ test/java/org/apache/hadoop/mrunit/mapreduce/

Author: dbeech
Date: Fri Aug  3 09:39:45 2012
New Revision: 1368848

URL: http://svn.apache.org/viewvc?rev=1368848&view=rev
Log:
MRUNIT-127: Key grouping with GroupingComparators is not consistent with MapReduce behaviour

Modified:
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1368848&r1=1368847&r2=1368848&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Fri Aug  3 09:39:45 2012
@@ -21,9 +21,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -163,7 +164,22 @@ public abstract class MapReduceDriverBas
    * @return the sorted list of (key, list(val))'s to present to the reducer
    */
   public List<Pair<K2, List<V2>>> shuffle(final List<Pair<K2, V2>> mapOutputs) {
-    // step 1 - use the key group comparator to organise map outputs
+    
+    // sort the map outputs using the key order comparator (if set)
+    if (keyValueOrderComparator != null) {
+      final Comparator<Pair<K2, V2>> pairKeyComparator = new Comparator<Pair<K2, V2>>() {
+        @Override
+        public int compare(final Pair<K2, V2> o1, final Pair<K2, V2> o2) {
+          return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
+        }
+      };
+      Collections.sort(mapOutputs, pairKeyComparator);
+    }
+    else {
+      Collections.sort(mapOutputs, new Pair.FirstElemComparator());
+    }
+    
+    // initialise grouping comparator
     final Comparator<K2> keyGroupComparator;
     if (this.keyGroupComparator == null) {
       keyGroupComparator = new JobConf(getConfiguration())
@@ -171,39 +187,28 @@ public abstract class MapReduceDriverBas
     } else {
       keyGroupComparator = this.keyGroupComparator;
     }
-    final TreeMap<K2, List<Pair<K2, V2>>> groupedByKey = new TreeMap<K2, List<Pair<K2, V2>>>(
-        keyGroupComparator);
-
-    List<Pair<K2, V2>> groupedKeyList;
+    
+    // apply grouping comparator to create groups
+    final Map<K2, List<Pair<K2, V2>>> groupedByKey = 
+        new LinkedHashMap<K2, List<Pair<K2, V2>>>();
+    
+    List<Pair<K2, V2>> groupedKeyList = null;
+    Pair<K2,V2> previous = null;
+    
     for (final Pair<K2, V2> mapOutput : mapOutputs) {
-      groupedKeyList = groupedByKey.get(mapOutput.getFirst());
-
-      if (groupedKeyList == null) {
+      if (previous == null || keyGroupComparator
+          .compare(previous.getFirst(), mapOutput.getFirst()) != 0) {
         groupedKeyList = new ArrayList<Pair<K2, V2>>();
         groupedByKey.put(mapOutput.getFirst(), groupedKeyList);
       }
-
       groupedKeyList.add(mapOutput);
+      previous = mapOutput;
     }
 
-    // step 2 - sort each key group using the key order comparator (if set)
-    final Comparator<Pair<K2, V2>> pairKeyComparator = new Comparator<Pair<K2, V2>>() {
-      @Override
-      public int compare(final Pair<K2, V2> o1, final Pair<K2, V2> o2) {
-        return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
-      }
-    };
-
-    // create shuffle stage output list
-    final List<Pair<K2, List<V2>>> outputKeyValuesList = new ArrayList<Pair<K2, List<V2>>>();
-
     // populate output list
-    for (final Entry<K2, List<Pair<K2, V2>>> groupedByKeyEntry : groupedByKey
-        .entrySet()) {
-      if (keyValueOrderComparator != null) {
-        // sort the key/value pairs using the key order comparator (if set)
-        Collections.sort(groupedByKeyEntry.getValue(), pairKeyComparator);
-      }
+    final List<Pair<K2, List<V2>>> outputKeyValuesList = new ArrayList<Pair<K2, List<V2>>>();
+    for (final Entry<K2, List<Pair<K2, V2>>> groupedByKeyEntry : 
+            groupedByKey.entrySet()) {
 
       // create list to hold values for the grouped key
       final List<V2> valuesList = new ArrayList<V2>();
@@ -216,7 +221,6 @@ public abstract class MapReduceDriverBas
           groupedByKeyEntry.getKey(), valuesList));
     }
 
-    // return output list
     return outputKeyValuesList;
   }
 

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java?rev=1368848&r1=1368847&r2=1368848&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/types/Pair.java Fri Aug  3 09:39:45 2012
@@ -69,8 +69,7 @@ public class Pair<S, T> implements Compa
     return firstResult;
   }
 
-  // TODO: Can this be made static? Same with SecondElemComparator?
-  public class FirstElemComparator implements Comparator<Pair<S, T>> {
+  public static class FirstElemComparator<S, T> implements Comparator<Pair<S, T>> {
     public FirstElemComparator() {
     }
 
@@ -80,7 +79,7 @@ public class Pair<S, T> implements Compa
     }
   }
 
-  public class SecondElemComparator implements Comparator<Pair<S, T>> {
+  public static class SecondElemComparator<S, T> implements Comparator<Pair<S, T>> {
     public SecondElemComparator() {
     }
 

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java?rev=1368848&r1=1368847&r2=1368848&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java Fri Aug  3 09:39:45 2012
@@ -279,7 +279,7 @@ public class TestMapReduceDriver {
   /**
    * group comparator - group by first character
    */
-  public static class GroupComparator implements RawComparator<Text> {
+  public static class FirstCharComparator implements RawComparator<Text> {
 
     @Override
     public int compare(final Text o1, final Text o2) {
@@ -298,7 +298,7 @@ public class TestMapReduceDriver {
   /**
    * value order comparator - order by second character
    */
-  public static class OrderComparator implements RawComparator<Text> {
+  public static class SecondCharComparator implements RawComparator<Text> {
     @Override
     public int compare(final Text o1, final Text o2) {
       return o1.toString().substring(1, 2)
@@ -339,17 +339,17 @@ public class TestMapReduceDriver {
       }
     });
 
-    driver.withKeyGroupingComparator(new GroupComparator());
-    driver.withKeyOrderComparator(new OrderComparator());
+    driver.withKeyGroupingComparator(new FirstCharComparator());
+    driver.withKeyOrderComparator(new SecondCharComparator());
 
     driver.addInput(new Text("a1"), new LongWritable(1));
     driver.addInput(new Text("b1"), new LongWritable(1));
     driver.addInput(new Text("a3"), new LongWritable(3));
     driver.addInput(new Text("a2"), new LongWritable(2));
 
-    driver.addOutput(new Text("a1"), new LongWritable(0x1 | (0x2 << 8)
-        | (0x3 << 16)));
+    driver.addOutput(new Text("a1"), new LongWritable(0x1));
     driver.addOutput(new Text("b1"), new LongWritable(0x1));
+    driver.addOutput(new Text("a2"), new LongWritable(0x2 | (0x3 << 8)));
 
     driver.runTest();
   }
@@ -580,4 +580,39 @@ public class TestMapReduceDriver {
     assertNotNull(mapper.getMapInputPath());
     assertEquals(mapInputPath.getName(), mapper.getMapInputPath().getName());
   }
+
+  @Test
+  public void testGroupingComparatorBehaviour1() throws IOException {
+    driver.withInput(new Text("A1"),new LongWritable(1L))
+      .withInput(new Text("A2"),new LongWritable(1L))
+      .withInput(new Text("B1"),new LongWritable(1L))
+      .withInput(new Text("B2"),new LongWritable(1L))
+      .withInput(new Text("C1"),new LongWritable(1L))
+      .withOutput(new Text("A1"),new LongWritable(2L))
+      .withOutput(new Text("B1"),new LongWritable(2L))
+      .withOutput(new Text("C1"),new LongWritable(1L))
+      .withKeyGroupingComparator(new FirstCharComparator())
+      .runTest(false);
+  }
+
+  @Test
+  public void testGroupingComparatorBehaviour2() throws IOException {
+    // this test fails pre-MRUNIT-127 because of the incorrect 
+    // grouping of reduce keys in "shuffle". 
+    // MapReduce doesn't group keys which aren't in a contiguous
+    // range when sorted by their sorting comparator. 
+    driver.withInput(new Text("1A"),new LongWritable(1L))
+      .withInput(new Text("2A"),new LongWritable(1L))
+      .withInput(new Text("1B"),new LongWritable(1L))
+      .withInput(new Text("2B"),new LongWritable(1L))
+      .withInput(new Text("1C"),new LongWritable(1L))
+      .withOutput(new Text("1A"),new LongWritable(1L))
+      .withOutput(new Text("2A"),new LongWritable(1L))
+      .withOutput(new Text("1B"),new LongWritable(1L))
+      .withOutput(new Text("2B"),new LongWritable(1L))
+      .withOutput(new Text("1C"),new LongWritable(1L))
+      .withKeyGroupingComparator(new SecondCharComparator())
+      .runTest(false);
+  }
+
 }

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java?rev=1368848&r1=1368847&r2=1368848&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java Fri Aug  3 09:39:45 2012
@@ -41,8 +41,8 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.mrunit.ExpectedSuppliedException;
-import org.apache.hadoop.mrunit.TestMapReduceDriver.GroupComparator;
-import org.apache.hadoop.mrunit.TestMapReduceDriver.OrderComparator;
+import org.apache.hadoop.mrunit.TestMapReduceDriver.FirstCharComparator;
+import org.apache.hadoop.mrunit.TestMapReduceDriver.SecondCharComparator;
 import org.apache.hadoop.mrunit.mapreduce.TestMapDriver.ConfigurationMapper;
 import org.apache.hadoop.mrunit.mapreduce.TestReduceDriver.ConfigurationReducer;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -282,17 +282,17 @@ public class TestMapReduceDriver {
       }
     });
 
-    driver.withKeyGroupingComparator(new GroupComparator());
-    driver.withKeyOrderComparator(new OrderComparator());
+    driver.withKeyGroupingComparator(new FirstCharComparator());
+    driver.withKeyOrderComparator(new SecondCharComparator());
 
     driver.addInput(new Text("a1"), new LongWritable(1));
     driver.addInput(new Text("b1"), new LongWritable(1));
     driver.addInput(new Text("a3"), new LongWritable(3));
     driver.addInput(new Text("a2"), new LongWritable(2));
 
-    driver.addOutput(new Text("a1"), new LongWritable(0x1 | (0x2 << 8)
-        | (0x3 << 16)));
+    driver.addOutput(new Text("a1"), new LongWritable(0x1));
     driver.addOutput(new Text("b1"), new LongWritable(0x1));
+    driver.addOutput(new Text("a2"), new LongWritable(0x2 | (0x3 << 8)));
 
     driver.runTest();
   }
@@ -520,4 +520,39 @@ public class TestMapReduceDriver {
     assertNotNull(mapper.getMapInputPath());
     assertEquals(mapInputPath.getName(), mapper.getMapInputPath().getName());
   }
+
+  @Test
+  public void testGroupingComparatorBehaviour1() throws IOException {
+    driver.withInput(new Text("A1"),new LongWritable(1L))
+      .withInput(new Text("A2"),new LongWritable(1L))
+      .withInput(new Text("B1"),new LongWritable(1L))
+      .withInput(new Text("B2"),new LongWritable(1L))
+      .withInput(new Text("C1"),new LongWritable(1L))
+      .withOutput(new Text("A1"),new LongWritable(2L))
+      .withOutput(new Text("B1"),new LongWritable(2L))
+      .withOutput(new Text("C1"),new LongWritable(1L))
+      .withKeyGroupingComparator(new FirstCharComparator())
+      .runTest(false);
+  }
+
+  @Test
+  public void testGroupingComparatorBehaviour2() throws IOException {
+    // this test fails pre-MRUNIT-127 because of the incorrect 
+    // grouping of reduce keys in "shuffle". 
+    // MapReduce doesn't group keys which aren't in a contiguous
+    // range when sorted by their sorting comparator. 
+    driver.withInput(new Text("1A"),new LongWritable(1L))
+      .withInput(new Text("2A"),new LongWritable(1L))
+      .withInput(new Text("1B"),new LongWritable(1L))
+      .withInput(new Text("2B"),new LongWritable(1L))
+      .withInput(new Text("1C"),new LongWritable(1L))
+      .withOutput(new Text("1A"),new LongWritable(1L))
+      .withOutput(new Text("2A"),new LongWritable(1L))
+      .withOutput(new Text("1B"),new LongWritable(1L))
+      .withOutput(new Text("2B"),new LongWritable(1L))
+      .withOutput(new Text("1C"),new LongWritable(1L))
+      .withKeyGroupingComparator(new SecondCharComparator())
+      .runTest(false);
+  }
+
 }