You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/04/25 22:48:18 UTC

svn commit: r937871 - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/ src/contrib/mrunit/src/test...

Author: cdouglas
Date: Sun Apr 25 20:48:17 2010
New Revision: 937871

URL: http://svn.apache.org/viewvc?rev=937871&view=rev
Log:
MAPREDUCE-1570. Add grouping comparators to MRUnit. Contributed by Chris White

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Apr 25 20:48:17 2010
@@ -267,6 +267,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1304. Add a task counter tracking time spent in GC. (Aaron
     Kimball via cdouglas)
 
+    MAPREDUCE-1570. Add grouping comparators to MRUnit. (Chris White via
+    cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java Sun Apr 25 20:48:17 2010
@@ -21,6 +21,7 @@ package org.apache.hadoop.mrunit;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +29,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Mapper;
@@ -48,7 +50,7 @@ import org.apache.hadoop.mrunit.types.Pa
  * If a combiner is specified, then it will be run exactly once after
  * the Mapper and before the Reducer.
  */
-public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
+public class MapReduceDriver<K1, V1, K2 extends Comparable, V2, K3, V3>
     extends MapReduceDriverBase<K1, V1, K2, V2, K3, V3> {
 
   public static final Log LOG = LogFactory.getLog(MapReduceDriver.class);
@@ -288,4 +290,30 @@ public class MapReduceDriver<K1, V1, K2,
   public String toString() {
     return "MapReduceDriver (" + myMapper + ", " + myReducer + ")";
   }
+  
+  /**
+   * Identical to {@link #setKeyGroupingComparator(RawComparator)}, but with a 
+   * fluent programming style 
+   * @param groupingComparator Comparator to use in the shuffle stage for key 
+   * grouping 
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withKeyGroupingComparator(
+      RawComparator<K2> groupingComparator) {
+    setKeyGroupingComparator(groupingComparator);
+    return this;
+  }
+  
+  /**
+   * Identical to {@link #setKeyOrderComparator(RawComparator)}, but with a 
+   * fluent programming style 
+   * @param orderComparator Comparator to use in the shuffle stage for key 
+   * value ordering 
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withKeyOrderComparator(
+      RawComparator<K2> orderComparator) {
+    setKeyOrderComparator(orderComparator);
+    return this;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Sun Apr 25 20:48:17 2010
@@ -21,14 +21,16 @@ package org.apache.hadoop.mrunit;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.TreeMap;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -41,15 +43,33 @@ import org.apache.hadoop.mrunit.types.Pa
  * This is designed to handle a single (k, v)* -> (k, v)* case from the
  * Mapper/Reducer pair, representing a single unit test.
  */
-public abstract class MapReduceDriverBase<K1, V1, K2, V2, K3, V3>
+public abstract class MapReduceDriverBase<K1, V1, K2 extends Comparable, V2, K3, V3>
     extends TestDriver<K1, V1, K3, V3> {
 
   public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
 
   protected List<Pair<K1, V1>> inputList;
+  
+  /** Key group comparator */
+  protected Comparator<K2> keyGroupComparator;
+  
+  /** Key value order comparator */
+  protected Comparator<K2> keyValueOrderComparator;
 
   public MapReduceDriverBase() {
     inputList = new ArrayList<Pair<K1, V1>>();
+    
+    // create a simple comparator for key grouping and ordering
+    Comparator<K2> simpleComparator = new Comparator<K2>() {
+      @Override
+      public int compare(K2 o1, K2 o2) {
+        return o1.compareTo(o2);
+      }
+    };
+    
+    // assign simple key grouping and ordering comparator
+    keyGroupComparator = simpleComparator;
+    keyValueOrderComparator = null;
   }
 
   /**
@@ -158,38 +178,82 @@ public abstract class MapReduceDriverBas
    * @return the sorted list of (key, list(val))'s to present to the reducer
    */
   public List<Pair<K2, List<V2>>> shuffle(List<Pair<K2, V2>> mapOutputs) {
-    HashMap<K2, List<V2>> reducerInputs = new HashMap<K2, List<V2>>();
-
-    // step 1: condense all values with the same key into a list.
+    // step 1 - use the key group comparator to organise map outputs
+    final TreeMap<K2, List<Pair<K2,V2>>> groupedByKey = 
+        new TreeMap<K2, List<Pair<K2,V2>>>(keyGroupComparator);
+    
+    List<Pair<K2,V2>> groupedKeyList;
     for (Pair<K2, V2> mapOutput : mapOutputs) {
-      List<V2> valuesForKey = reducerInputs.get(mapOutput.getFirst());
-
-      if (null == valuesForKey) {
-        // this is the first (k, v) pair for this key. Add it to the list.
-        valuesForKey = new ArrayList<V2>();
-        valuesForKey.add(mapOutput.getSecond());
-        reducerInputs.put(mapOutput.getFirst(), valuesForKey);
-      } else {
-        // add this value to the existing list for this key
-        valuesForKey.add(mapOutput.getSecond());
+      groupedKeyList = groupedByKey.get(mapOutput.getFirst());
+      
+      if (groupedKeyList == null) {
+        groupedKeyList = new ArrayList<Pair<K2,V2>>();
+        groupedByKey.put(mapOutput.getFirst(), groupedKeyList);
       }
+      
+      groupedKeyList.add(mapOutput);
     }
-
-    // build a list out of these (k, list(v)) pairs
-    List<Pair<K2, List<V2>>> finalInputs = new ArrayList<Pair<K2, List<V2>>>();
-    Set<Map.Entry<K2, List<V2>>> entries = reducerInputs.entrySet();
-    for (Map.Entry<K2, List<V2>> entry : entries) {
-      K2 key = entry.getKey();
-      List<V2> vals = entry.getValue();
-      finalInputs.add(new Pair<K2, List<V2>>(key, vals));
-    }
-
-    // and then sort the output list by key
-    if (finalInputs.size() > 0) {
-      Collections.sort(finalInputs,
-              finalInputs.get(0).new FirstElemComparator());
-    }
-
-    return finalInputs;
+    
+    // step 2 - sort each key group using the key order comparator (if set)
+    Comparator<Pair<K2,V2>> pairKeyComparator = new Comparator<Pair<K2, V2>>() {
+      @Override
+      public int compare(Pair<K2, V2> o1, Pair<K2, V2> o2) {
+        return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
+      }
+    };
+    
+    // create shuffle stage output list
+    List<Pair<K2, List<V2>>> outputKeyValuesList = 
+        new ArrayList<Pair<K2,List<V2>>>();
+    
+    // populate output list
+    for (Entry<K2, List<Pair<K2, V2>>> groupedByKeyEntry : 
+          groupedByKey.entrySet()) {
+      if (keyValueOrderComparator != null) {
+        // sort the key/value pairs using the key order comparator (if set)
+        Collections.sort(groupedByKeyEntry.getValue(), pairKeyComparator);
+      }
+      
+      // create list to hold values for the grouped key
+      List<V2> valuesList = new ArrayList<V2>();
+      for (Pair<K2, V2> pair : groupedByKeyEntry.getValue()) {
+        valuesList.add(pair.getSecond());
+      }
+      
+      // add key and values to output list
+      outputKeyValuesList.add(
+          new Pair<K2,List<V2>>(groupedByKeyEntry.getKey(), valuesList));
+    }
+    
+    // return output list
+    return outputKeyValuesList;
+  }
+  
+  /**
+   * Set the key grouping comparator, similar to calling the following API 
+   * calls but passing a real instance rather than just the class:
+   * <UL>
+   * <LI>pre 0.20.1 API: {@link JobConf#setOutputValueGroupingComparator(Class)}
+   * <LI>0.20.1+ API: {@link Job#setGroupingComparatorClass(Class)}
+   * </UL>
+   * @param groupingComparator
+   */
+  public void setKeyGroupingComparator(
+        RawComparator<K2> groupingComparator) {
+    keyGroupComparator = groupingComparator;
+  }
+  
+  /**
+   * Set the key value order comparator, similar to calling the following API 
+   * calls but passing a real instance rather than just the class:
+   * <UL>
+   * <LI>pre 0.20.1 API: {@link JobConf#setOutputKeyComparatorClass(Class)}
+   * <LI>0.20.1+ API: {@link Job#setSortComparatorClass(Class)}
+   * </UL>
+   * @param orderComparator
+   */
+  public void setKeyOrderComparator(
+        RawComparator<K2> orderComparator) {
+    keyValueOrderComparator = orderComparator;
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Sun Apr 25 20:48:17 2010
@@ -21,6 +21,7 @@ package org.apache.hadoop.mrunit.mapredu
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -46,7 +48,7 @@ import org.apache.hadoop.mrunit.types.Pa
  * This is designed to handle a single (k, v)* -> (k, v)* case from the
  * Mapper/Reducer pair, representing a single unit test.
  */
-public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
+public class MapReduceDriver<K1, V1, K2 extends Comparable, V2, K3, V3>
     extends MapReduceDriverBase<K1, V1, K2, V2, K3, V3> {
 
   public static final Log LOG = LogFactory.getLog(MapReduceDriver.class);
@@ -240,4 +242,30 @@ public class MapReduceDriver<K1, V1, K2,
     setConfiguration(configuration);
     return this;
   }
+  
+  /**
+   * Identical to {@link #setKeyGroupingComparator(RawComparator)}, but with a 
+   * fluent programming style 
+   * @param groupingComparator Comparator to use in the shuffle stage for key 
+   * grouping 
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withKeyGroupingComparator(
+      RawComparator<K2> groupingComparator) {
+    setKeyGroupingComparator(groupingComparator);
+    return this;
+  }
+  
+  /**
+   * Identical to {@link #setKeyOrderComparator(RawComparator)}, but with a 
+   * fluent programming style 
+   * @param orderComparator Comparator to use in the shuffle stage for key 
+   * value ordering 
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withKeyOrderComparator(
+      RawComparator<K2> orderComparator) {
+    setKeyOrderComparator(orderComparator);
+    return this;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java Sun Apr 25 20:48:17 2010
@@ -22,14 +22,20 @@ import static org.apache.hadoop.mrunit.t
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
@@ -274,5 +280,74 @@ public class TestMapReduceDriver extends
             .withOutput(new Text("foo"), new LongWritable(FOO_OUT))
             .runTest();
   }
+  
+  // Test the key grouping and value ordering comparators
+  @Test
+  public void testComparators() {
+    // group comparator - group by first character
+    RawComparator groupComparator = new RawComparator() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        return o1.toString().substring(0, 1).compareTo(
+            o2.toString().substring(0, 1));
+      }
+
+      @Override
+      public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
+          int arg4, int arg5) {
+        throw new RuntimeException("Not implemented");
+      }  
+    };
+    
+    // value order comparator - order by second character
+    RawComparator orderComparator = new RawComparator() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        return o1.toString().substring(1, 2).compareTo(
+            o2.toString().substring(1, 2));
+      }
+      
+      @Override
+      public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
+          int arg4, int arg5) {
+        throw new RuntimeException("Not implemented");
+      }
+    };
+    
+    // reducer to track the order of the input values using bit shifting
+    driver.withReducer(new Reducer<Text, LongWritable, Text, LongWritable>() {
+      @Override
+      public void reduce(Text key, Iterator<LongWritable> values,
+          OutputCollector<Text, LongWritable> output, Reporter reporter)
+          throws IOException {
+        long outputValue = 0;
+        int count = 0;
+        while (values.hasNext()) {
+          outputValue |= (values.next().get() << (count++*8));
+        }
+        
+        output.collect(key, new LongWritable(outputValue));
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {}
+    });
+    
+    driver.withKeyGroupingComparator(groupComparator);
+    driver.withKeyOrderComparator(orderComparator);
+    
+    driver.addInput(new Text("a1"), new LongWritable(1));
+    driver.addInput(new Text("b1"), new LongWritable(1));
+    driver.addInput(new Text("a3"), new LongWritable(3));
+    driver.addInput(new Text("a2"), new LongWritable(2));
+    
+    driver.addOutput(new Text("a1"), new LongWritable(0x1 | (0x2 << 8) | (0x3 << 16)));
+    driver.addOutput(new Text("b1"), new LongWritable(0x1));
+    
+    driver.runTest();
+  }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java?rev=937871&r1=937870&r2=937871&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java Sun Apr 25 20:48:17 2010
@@ -22,6 +22,7 @@ import static org.apache.hadoop.mrunit.t
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 
 import junit.framework.TestCase;
@@ -29,9 +30,14 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Reducer.Context;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.mrunit.mapreduce.TestMapDriver.ConfigurationMapper;
 import org.apache.hadoop.mrunit.mapreduce.TestReduceDriver.ConfigurationReducer;
@@ -256,5 +262,65 @@ public class TestMapReduceDriver extends
 	  assertEquals(reducer.setupConfiguration.get("TestKey"), "TestValue");	  
   }
 
+  // Test the key grouping and value ordering comparators
+  @Test
+  public void testComparators() {
+    // group comparator - group by first character
+    RawComparator groupComparator = new RawComparator() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        return o1.toString().substring(0, 1).compareTo(
+            o2.toString().substring(0, 1));
+      }
+
+      @Override
+      public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
+          int arg4, int arg5) {
+        throw new RuntimeException("Not implemented");
+      }  
+    };
+    
+    // value order comparator - order by second character
+    RawComparator orderComparator = new RawComparator() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        return o1.toString().substring(1, 2).compareTo(
+            o2.toString().substring(1, 2));
+      }
+      
+      @Override
+      public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
+          int arg4, int arg5) {
+        throw new RuntimeException("Not implemented");
+      }
+    };
+    
+    // reducer to track the order of the input values using bit shifting
+    driver.withReducer(new Reducer<Text, LongWritable, Text, LongWritable>() {
+      protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+          throws IOException, InterruptedException {
+        long outputValue = 0;
+        int count = 0;
+        for (LongWritable value : values) {
+          outputValue |= (value.get() << (count++*8));
+        }
+        
+        context.write(key, new LongWritable(outputValue));
+      }
+    });
+    
+    driver.withKeyGroupingComparator(groupComparator);
+    driver.withKeyOrderComparator(orderComparator);
+    
+    driver.addInput(new Text("a1"), new LongWritable(1));
+    driver.addInput(new Text("b1"), new LongWritable(1));
+    driver.addInput(new Text("a3"), new LongWritable(3));
+    driver.addInput(new Text("a2"), new LongWritable(2));
+    
+    driver.addOutput(new Text("a1"), new LongWritable(0x1 | (0x2 << 8) | (0x3 << 16)));
+    driver.addOutput(new Text("b1"), new LongWritable(0x1));
+    
+    driver.runTest();
+  }
 }