You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by br...@apache.org on 2012/07/29 18:07:37 UTC

svn commit: r1366869 - in /mrunit/trunk/src: main/java/org/apache/hadoop/mrunit/ main/java/org/apache/hadoop/mrunit/mapreduce/ test/java/org/apache/hadoop/mrunit/mapreduce/

Author: brock
Date: Sun Jul 29 16:07:36 2012
New Revision: 1366869

URL: http://svn.apache.org/viewvc?rev=1366869&view=rev
Log:
MRUNIT-64: Multiple Input Key, Value Pairs should be supported

Added:
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java
Modified:
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java Sun Jul 29 16:07:36 2012
@@ -27,13 +27,11 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
- * Harness that allows you to test a Mapper instance. You provide the input key
- * and value that should be sent to the Mapper, and outputs you expect to be
+ * Harness that allows you to test a Mapper instance. You provide the input
+ * (k, v)* pairs that should be sent to the Mapper, and outputs you expect to be
  * sent by the Mapper to the collector for those inputs. By calling runTest(),
  * the harness will deliver the input to the Mapper and will check its outputs
- * against the expected results. This is designed to handle a single (k, v) ->
- * (k, v)* case from the Mapper, representing a single unit test. Multiple input
- * (k, v) pairs should go in separate unit tests.
+ * against the expected results.
  */
 public abstract class MapDriverBase<K1, V1, K2, V2> extends
     TestDriver<K1, V1, K2, V2> {
@@ -41,7 +39,10 @@ public abstract class MapDriverBase<K1, 
   public static final Log LOG = LogFactory.getLog(MapDriverBase.class);
 
   protected List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
+  
+  @Deprecated
   protected K1 inputKey;
+  @Deprecated
   protected V1 inputVal;
 
   /**
@@ -49,10 +50,11 @@ public abstract class MapDriverBase<K1, 
    * 
    * @param key
    */
+  @Deprecated
   public void setInputKey(final K1 key) {
     inputKey = copy(key);
   }
-
+  @Deprecated
   public K1 getInputKey() {
     return inputKey;
   }
@@ -62,10 +64,11 @@ public abstract class MapDriverBase<K1, 
    * 
    * @param val
    */
+  @Deprecated
   public void setInputValue(final V1 val) {
     inputVal = copy(val);
   }
-
+  @Deprecated
   public V1 getInputValue() {
     return inputVal;
   }
@@ -75,8 +78,7 @@ public abstract class MapDriverBase<K1, 
    * 
    */
   public void setInput(final K1 key, final V1 val) {
-    setInputKey(key);
-    setInputValue(val);
+  	setInput(new Pair<K1, V1>(key, val));
   }
 
   /**
@@ -88,9 +90,63 @@ public abstract class MapDriverBase<K1, 
   public void setInput(final Pair<K1, V1> inputRecord) {
     setInputKey(inputRecord.getFirst());
     setInputValue(inputRecord.getSecond());
+
+    clearInput();
+    addInput(inputRecord);
   }
 
   /**
+   * Adds an input to send to the mapper
+   * 
+   * @param key
+   * @param val
+   */
+  public void addInput(final K1 key, final V1 val) {
+    inputs.add(copyPair(key, val));
+  }
+  
+  /**
+   * Adds an input to send to the mapper
+   * 
+   * @param input
+   *          a (K, V) pair
+   */
+  public void addInput(final Pair<K1, V1> input) {
+    addInput(input.getFirst(), input.getSecond());
+  }
+  
+  /**
+   * Adds list of inputs to send to the mapper
+   * 
+   * @param inputs
+   *          list of (K, V) pairs
+   */
+  public void addAll(final List<Pair<K1, V1>> inputs) {
+    for (Pair<K1, V1> input : inputs) {
+      addInput(input);
+    }
+  }
+  
+  /**
+   * Clears the list of inputs to send to the mapper
+   */
+  public void clearInput() {
+    inputs.clear();
+  }
+  
+  /**
+   * Adds output (k, v)* pairs we expect from the Mapper
+   * 
+   * @param outputRecords
+   *          The (k, v)* pairs to add
+   */
+  public void addAllOutput(final List<Pair<K2, V2>> outputRecords) {
+    for (Pair<K2, V2> output : outputRecords) {
+      addOutput(output);
+    }
+  }
+  
+  /**
    * Adds an output (k, v) pair we expect from the Mapper
    * 
    * @param outputRecord
@@ -145,7 +201,9 @@ public abstract class MapDriverBase<K1, 
 
   @Override
   public void runTest(final boolean orderMatters) throws IOException {
-    LOG.debug("Mapping input (" + inputKey + ", " + inputVal + ")");
+  	for (Pair<K1, V1> input : inputs) {
+  		LOG.debug("Mapping input (" + input.getFirst() + ", " + input.getSecond() + ")");
+  	}
     final List<Pair<K2, V2>> outputs = run();
     validate(outputs, orderMatters);
     validate(counterWrapper);

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Sun Jul 29 16:07:36 2012
@@ -78,6 +78,30 @@ public abstract class MapReduceDriverBas
   }
 
   /**
+   * Adds input to send to the mapper
+   * 
+   * @param inputs
+   *          List of (k, v) pairs to add to the input list
+   */
+  public void addAll(final List<Pair<K1, V1>> inputs) {
+    for (Pair<K1, V1> input : inputs) {
+      addInput(input);
+    }
+  }
+  
+  /**
+   * Adds output (k, v)* pairs we expect from the Reducer
+   * 
+   * @param outputRecords
+   *          List of (k, v) pairs to add
+   */
+  public void addAllOutput(final List<Pair<K3, V3>> outputRecords) {
+    for (Pair<K3, V3> output : outputRecords) {
+      addOutput(output);
+    }
+  }
+  
+  /**
    * Adds an output (k, v) pair we expect from the Reducer
    * 
    * @param outputRecord

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java Sun Jul 29 16:07:36 2012
@@ -33,14 +33,15 @@ import org.apache.hadoop.mrunit.types.Pa
  * sent to the Reducer (as if they came from a Mapper), and outputs you expect
  * to be sent by the Reducer to the collector. By calling runTest(), the harness
  * will deliver the input to the Reducer and will check its outputs against the
- * expected results. This is designed to handle a single (k, v*) -> (k, v)* case
- * from the Reducer, representing a single unit test. Multiple input (k, v*)
- * sets should go in separate unit tests.
+ * expected results. 
  */
 public abstract class ReduceDriverBase<K1, V1, K2, V2> extends
     TestDriver<K1, V1, K2, V2> {
 
+  protected List<Pair<K1, List<V1>>> inputs = new ArrayList<Pair<K1, List<V1>>>();
+  @Deprecated
   protected K1 inputKey;
+  @Deprecated
   private final List<V1> inputValues;
 
   public ReduceDriverBase() {
@@ -48,20 +49,20 @@ public abstract class ReduceDriverBase<K
   }
 
   /**
-   * Returns a list which when iterated over, returns the same instance of the
-   * value each time with different contents similar to how Hadoop currently
-   * works with Writables.
+   * Returns a list of values.
    * 
    * @return List of values
    */
+  @Deprecated
   public List<V1> getInputValues() {
-    return new ValueClassInstanceReuseList<V1>(inputValues, getConfiguration());
+    return inputValues;
   }
 
   /**
    * Sets the input key to send to the Reducer
    * 
    */
+  @Deprecated
   public void setInputKey(final K1 key) {
     inputKey = copy(key);
   }
@@ -71,6 +72,7 @@ public abstract class ReduceDriverBase<K
    * 
    * @param val
    */
+  @Deprecated
   public void addInputValue(final V1 val) {
     inputValues.add(copy(val));
   }
@@ -80,6 +82,7 @@ public abstract class ReduceDriverBase<K
    * 
    * @param values
    */
+  @Deprecated
   public void setInputValues(final List<V1> values) {
     inputValues.clear();
     addInputValues(values);
@@ -90,6 +93,7 @@ public abstract class ReduceDriverBase<K
    * 
    * @param values
    */
+  @Deprecated
   public void addInputValues(final List<V1> values) {
     for (V1 value : values) {
       addInputValue(value);
@@ -105,8 +109,70 @@ public abstract class ReduceDriverBase<K
   public void setInput(final K1 key, final List<V1> values) {
     setInputKey(key);
     setInputValues(values);
+    
+    clearInput();
+    addInput(key, values);
+  }
+  
+  /**
+   * Clears the input to be sent to the Reducer
+   */
+  public void clearInput() {
+    inputs.clear();
   }
+  
+  /**
+   * Add input (K, V*) to send to the Reducer
+   * 
+   * @param key
+   *          The key too add
+   * @param values
+   *          The list of values to add
+   */
+  public void addInput(final K1 key, final List<V1> values) {
+    List<V1> copyVals = new ArrayList<V1>();
+    for (V1 val : values) {
+      copyVals.add(copy(val));
+    }
 
+    inputs.add(new Pair<K1, List<V1>>(copy(key),
+        new ValueClassInstanceReuseList<V1>(copyVals, getConfiguration())));
+  }
+  
+  /**
+   * Add input (K, V*) to send to the Reducer
+   * 
+   * @param input
+   *          input pair
+   */
+  public void addInput(final Pair<K1, List<V1>> input) {
+    addInput(input.getFirst(), input.getSecond());
+  }
+
+  /**
+   * Adds input to send to the Reducer
+   * 
+   * @param inputs
+   *          list of (K, V*) pairs
+   */
+  public void addAll(final List<Pair<K1, List<V1>>> inputs) {
+    for (Pair<K1, List<V1>> input : inputs) {
+      addInput(input);
+    }
+  }
+  
+  /**
+   * Adds output (k, v)* pairs we expect from the Reducer
+   * 
+   * @param outputRecords
+   *          The (k, v)* pairs to add
+   */
+  public void addAllOutput(final List<Pair<K2, V2>> outputRecords) {
+    for (Pair<K2, V2> output : outputRecords) {
+      addOutput(output);
+    }
+  }
+  
   /**
    * Adds an output (k, v) pair we expect from the Reducer
    * 
@@ -167,10 +233,14 @@ public abstract class ReduceDriverBase<K
 
   @Override
   public void runTest(final boolean orderMatters) throws IOException {
-    final StringBuilder sb = new StringBuilder();
-    formatValueList(inputValues, sb);
-
-    LOG.debug("Reducing input (" + inputKey + ", " + sb + ")");
+    if (LOG.isDebugEnabled()) {
+      final StringBuilder sb = new StringBuilder();
+      for (Pair<K1, List<V1>> input : inputs) {
+        formatValueList(input.getSecond(), sb);
+        LOG.debug("Reducing input (" + input.getFirst() + ", " + sb + ")");
+        sb.delete(0, sb.length());
+      }
+    }
 
     List<Pair<K2, V2>> outputs = null;
     outputs = run();

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java Sun Jul 29 16:07:36 2012
@@ -39,13 +39,11 @@ import org.apache.hadoop.mrunit.internal
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
- * Harness that allows you to test a Mapper instance. You provide the input key
- * and value that should be sent to the Mapper, and outputs you expect to be
+ * Harness that allows you to test a Mapper instance. You provide the input
+ * (k, v)* pairs that should be sent to the Mapper, and outputs you expect to be
  * sent by the Mapper to the collector for those inputs. By calling runTest(),
  * the harness will deliver the input to the Mapper and will check its outputs
- * against the expected results. This is designed to handle a single (k, v) ->
- * (k, v)* case from the Mapper, representing a single unit test. Multiple input
- * (k, v) pairs should go in separate unit tests.
+ * against the expected results.
  */
 public class MapDriver<K1, V1, K2, V2> 
 extends MapDriverBase<K1, V1, K2, V2> implements ContextDriver {
@@ -120,6 +118,7 @@ extends MapDriverBase<K1, V1, K2, V2> im
    * 
    * @return this
    */
+  @Deprecated
   public MapDriver<K1, V1, K2, V2> withInputKey(final K1 key) {
     setInputKey(key);
     return this;
@@ -131,6 +130,7 @@ extends MapDriverBase<K1, V1, K2, V2> im
    * @param val
    * @return this
    */
+  @Deprecated
   public MapDriver<K1, V1, K2, V2> withInputValue(final V1 val) {
     setInputValue(val);
     return this;
@@ -158,6 +158,17 @@ extends MapDriverBase<K1, V1, K2, V2> im
   }
 
   /**
+   * Identical to addAll() but returns self for fluent programming style
+   * 
+   * @param inputRecords
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withAll(final List<Pair<K1, V1>> inputRecords) {
+    addAll(inputRecords);
+    return this;
+  }
+  
+  /**
    * Works like addOutput(), but returns self for fluent style
    * 
    * @param outputRecord
@@ -179,6 +190,18 @@ extends MapDriverBase<K1, V1, K2, V2> im
   }
 
   /**
+   * Functions like addAllOutput() but returns self for fluent programming style
+   * 
+   * @param outputRecords
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withAllOutput(
+      final List<Pair<K2, V2>> outputRecords) {
+    addAllOutput(outputRecords);
+    return this;
+  }
+  
+  /**
    * Identical to setInputFromString, but with a fluent programming style
    * 
    * @param input
@@ -224,15 +247,18 @@ extends MapDriverBase<K1, V1, K2, V2> im
 
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
-    if (inputKey == null || inputVal == null) {
+    // handle inputKey and inputVal for backwards compatibility
+    if (inputKey != null && inputVal != null) {
+      setInput(inputKey, inputVal);
+    }
+
+    if (inputs == null || inputs.size() == 0) {
       throw new IllegalStateException("No input was provided");
     }
+   	
     if (myMapper == null) {
       throw new IllegalStateException("No Mapper class was provided");
-    }
-    
-    inputs.clear();
-    inputs.add(new Pair<K1, V1>(inputKey, inputVal));
+    }        
 
     try {
       myMapper.run(wrapper.getMockContext());
@@ -262,7 +288,7 @@ extends MapDriverBase<K1, V1, K2, V2> im
   /**
    * @param mapInputPath
    *       The Path object which will be given to the mapper
-   * @return
+   * @return this object for fluent coding
    */
   public MapDriver<K1, V1, K2, V2> withMapInputPath(Path mapInputPath) {
     setMapInputPath(mapInputPath);

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Sun Jul 29 16:07:36 2012
@@ -214,6 +214,31 @@ public class MapReduceDriver<K1, V1, K2,
   }
 
   /**
+   * Identical to addAll() but returns self for fluent programming style
+   * 
+   * @param inputs
+   *          List of (k, v) pairs to add
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withAll(
+      final List<Pair<K1, V1>> inputs) {
+    addAll(inputs);
+    return this;
+  }
+  
+  /**
+   * Works like addAllOutput(), but returns self for fluent style
+   * 
+   * @param outputRecords
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withAllOutput(
+      final List<Pair<K3, V3>> outputRecords) {
+    addAllOutput(outputRecords);
+    return this;
+  }
+  
+  /**
    * Works like addOutput(), but returns self for fluent style
    * 
    * @param outputRecord
@@ -296,25 +321,29 @@ public class MapReduceDriver<K1, V1, K2,
 
       final List<Pair<OUTKEY, OUTVAL>> reduceOutputs = new ArrayList<Pair<OUTKEY, OUTVAL>>();
 
-      for (final Pair<K2, List<V2>> input : inputs) {
-        final K2 inputKey = input.getFirst();
-        final List<V2> inputValues = input.getSecond();
-        final StringBuilder sb = new StringBuilder();
-        formatValueList(inputValues, sb);
-        LOG.debug("Reducing input (" + inputKey.toString() + ", "
-            + sb.toString() + ")");
+      if (!inputs.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          final StringBuilder sb = new StringBuilder();
+          for (Pair<K2, List<V2>> input : inputs) {
+            formatValueList(input.getSecond(), sb);
+            LOG.debug("Reducing input (" + input.getFirst() + ", " + sb + ")");
+            sb.delete(0, sb.length());
+          }
+        }
 
         final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
             .newReduceDriver(reducer).withCounters(getCounters())
-            .withConfiguration(configuration).withInputKey(inputKey)
-            .withInputValues(inputValues);
+            .withConfiguration(configuration).withAll(inputs);
+
         if (getOutputCopyingOrInputFormatConfiguration() != null) {
           reduceDriver
               .withOutputCopyingOrInputFormatConfiguration(getOutputCopyingOrInputFormatConfiguration());
         }
+
         if (outputFormatClass != null) {
           reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
         }
+
         reduceOutputs.addAll(reduceDriver.run());
       }
 
@@ -337,13 +366,10 @@ public class MapReduceDriver<K1, V1, K2,
     List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
 
     // run map component
-    for (final Pair<K1, V1> input : inputList) {
-      LOG.debug("Mapping input " + input.toString() + ")");
-
-      mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
-          .withCounters(getCounters()).withConfiguration(configuration)
-          .withInput(input).withMapInputPath(getMapInputPath()).run());
-    }
+    LOG.debug("Starting map phase with mapper: " + myMapper);
+    mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
+        .withCounters(getCounters()).withConfiguration(configuration)
+        .withAll(inputList).withMapInputPath(getMapInputPath()).run());      
 
     if (myCombiner != null) {
       // User has specified a combiner. Run this and replace the mapper outputs
@@ -379,7 +405,7 @@ public class MapReduceDriver<K1, V1, K2,
   /**
    * @param mapInputPath
    *       The Path object which will be given to the mapper
-   * @return
+   * @return this
    */
   public MapReduceDriver<K1, V1, K2, V2, K3, V3> withMapInputPath(Path mapInputPath) {
     setMapInputPath(mapInputPath);

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java Sun Jul 29 16:07:36 2012
@@ -18,10 +18,9 @@
 
 package org.apache.hadoop.mrunit.mapreduce;
 
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -44,9 +43,7 @@ import org.apache.hadoop.mrunit.types.Pa
  * sent to the Reducer (as if they came from a Mapper), and outputs you expect
  * to be sent by the Reducer to the collector. By calling runTest(), the harness
  * will deliver the input to the Reducer and will check its outputs against the
- * expected results. This is designed to handle a single (k, v*) -> (k, v)* case
- * from the Reducer, representing a single unit test. Multiple input (k, v*)
- * sets should go in separate unit tests.
+ * expected results.
  */
 public class ReduceDriver<K1, V1, K2, V2> extends
     ReduceDriverBase<K1, V1, K2, V2> implements ContextDriver {
@@ -57,7 +54,6 @@ public class ReduceDriver<K1, V1, K2, V2
   private Counters counters;
 
   private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
-  private final List<Pair<K1, List<V1>>> inputs = new ArrayList<Pair<K1, List<V1>>>();
   private final MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
       inputs, mockOutputCreator, this);
 
@@ -126,6 +122,7 @@ public class ReduceDriver<K1, V1, K2, V2
    * 
    * @return this
    */
+  @Deprecated
   public ReduceDriver<K1, V1, K2, V2> withInputKey(final K1 key) {
     setInputKey(key);
     return this;
@@ -137,6 +134,7 @@ public class ReduceDriver<K1, V1, K2, V2
    * @param val
    * @return this
    */
+  @Deprecated
   public ReduceDriver<K1, V1, K2, V2> withInputValue(final V1 val) {
     addInputValue(val);
     return this;
@@ -148,6 +146,7 @@ public class ReduceDriver<K1, V1, K2, V2
    * @param values
    * @return this
    */
+  @Deprecated
   public ReduceDriver<K1, V1, K2, V2> withInputValues(final List<V1> values) {
     addInputValues(values);
     return this;
@@ -168,6 +167,29 @@ public class ReduceDriver<K1, V1, K2, V2
   }
 
   /**
+   * Identical to addInput() but returns self for fluent programming style
+   * 
+   * @param input
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withInput(final Pair<K1, List<V1>> input) {
+    addInput(input);
+    return this;
+  }
+
+  /**
+   * Identical to addAll() but returns self for fluent programming style
+   * 
+   * @param inputs
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withAll(
+      final List<Pair<K1, List<V1>>> inputs) {
+    addAll(inputs);
+    return this;
+  }
+  
+  /**
    * Works like addOutput(), but returns self for fluent style
    * 
    * @param outputRecord
@@ -193,6 +215,18 @@ public class ReduceDriver<K1, V1, K2, V2
   }
 
   /**
+   * Works like addAllOutput(), but returns self for fluent style
+   * 
+   * @param outputRecord
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withAllOutput(
+      final List<Pair<K2, V2>> outputRecords) {
+    addAllOutput(outputRecords);
+    return this;
+  }
+  
+  /**
    * Identical to setInput, but with a fluent programming style
    * 
    * @param input
@@ -238,16 +272,20 @@ public class ReduceDriver<K1, V1, K2, V2
 
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
-    if (inputKey == null || getInputValues().isEmpty()) {
+    // handle inputKey and inputValues for backwards compatibility
+    if (inputKey != null && !getInputValues().isEmpty()) {
+      clearInput();
+      addInput(inputKey, getInputValues());
+    }
+    
+    if (inputs.isEmpty()) {
       throw new IllegalStateException("No input was provided");
     }
+    
     if (myReducer == null) {
       throw new IllegalStateException("No Reducer class was provided");
     }
 
-    inputs.clear();
-    inputs.add(new Pair<K1, List<V1>>(inputKey, getInputValues()));
-
     try {
       myReducer.run(wrapper.getMockContext());
       return wrapper.getOutputs();

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java Sun Jul 29 16:07:36 2012
@@ -179,6 +179,19 @@ public class TestMapDriver {
   }
 
   @Test
+  public void testAddAll() throws IOException {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
+    inputs.add(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
+
+    final List<Pair<Text, Text>> outputs = new ArrayList<Pair<Text, Text>>();
+    outputs.add(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
+    outputs.add(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
+
+    driver.withAll(inputs).withAllOutput(outputs).runTest();
+  }
+  
+  @Test
   public void testSetInput() {
     driver.setInput(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
 

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java Sun Jul 29 16:07:36 2012
@@ -125,6 +125,20 @@ public class TestMapReduceDriver {
   }
 
   @Test
+  public void testAddAll() throws IOException {
+    final List<Pair<Text, LongWritable>> inputs = new ArrayList<Pair<Text, LongWritable>>();
+    inputs.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(FOO_IN_A)));
+    inputs.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(FOO_IN_B)));
+    inputs.add(new Pair<Text, LongWritable>(new Text("bar"), new LongWritable(BAR_IN)));
+
+    final List<Pair<Text, LongWritable>> outputs = new ArrayList<Pair<Text, LongWritable>>();
+    outputs.add(new Pair<Text, LongWritable>(new Text("bar"), new LongWritable(BAR_IN)));
+    outputs.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(FOO_OUT)));
+
+    driver.withAll(inputs).withAllOutput(outputs).runTest();
+  }
+  
+  @Test
   public void testTestRun3OrderInsensitive() throws IOException {
     driver.withInput(new Text("foo"), new LongWritable(FOO_IN_A))
         .withInput(new Text("bar"), new LongWritable(BAR_IN))

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java Sun Jul 29 16:07:36 2012
@@ -234,6 +234,22 @@ public class TestReduceDriver {
   }
 
   @Test
+  public void testAddAll() throws IOException {
+    final List<LongWritable> vals = new ArrayList<LongWritable>();
+    vals.add(new LongWritable(IN_A));
+    vals.add(new LongWritable(IN_B));
+
+    final List<Pair<Text, List<LongWritable>>> inputs = new ArrayList<Pair<Text, List<LongWritable>>>();
+    inputs.add(new Pair<Text, List<LongWritable>>(new Text("foo"), vals));
+
+    final List<Pair<Text, LongWritable>> expected = new ArrayList<Pair<Text, LongWritable>>();
+    expected.add(new Pair<Text, LongWritable>(new Text("foo"),
+        new LongWritable(OUT_VAL)));
+
+    driver.withAll(inputs).withAllOutput(expected).runTest();
+  }
+  
+  @Test
   public void testNoInput() throws IOException {
     driver = ReduceDriver.newReduceDriver();
     thrown.expectMessage(IllegalStateException.class, "No input was provided");

Added: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java?rev=1366869&view=auto
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java (added)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java Sun Jul 29 16:07:36 2012
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWordCount {
+  private static final String FILE01 = "Hello World Bye World";
+  private static final String FILE02 = "Hello Hadoop Goodbye Hadoop";
+  private static final int ONE = 1;
+  private static final int TWO = 2;
+  
+  private Mapper<LongWritable, Text, Text, IntWritable> mapper;
+  private Reducer<Text, IntWritable, Text, IntWritable> reducer;
+  private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> driver;
+  private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
+  private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
+  private List<Pair<Text, IntWritable>> expectedOutput;
+  
+  @Before
+  public void setup() {
+    mapper = new Map();
+    reducer = new Reduce();
+    driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
+    mapDriver = MapDriver.newMapDriver(mapper);
+    reduceDriver = ReduceDriver.newReduceDriver(reducer);
+    
+    expectedOutput = new ArrayList<Pair<Text, IntWritable>>();
+    expectedOutput.add(new Pair<Text, IntWritable>(new Text("Bye"), new IntWritable(ONE)));
+    expectedOutput.add(new Pair<Text, IntWritable>(new Text("Goodbye"), new IntWritable(ONE)));
+    expectedOutput.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(TWO)));
+    expectedOutput.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(TWO)));
+    expectedOutput.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(TWO)));
+  }
+
+  @Test
+  public void TestMapDriver() throws IOException {
+    final List<Pair<LongWritable, Text>> inputs = new ArrayList<Pair<LongWritable, Text>>();
+    inputs.add(new Pair<LongWritable, Text>(new LongWritable(21), new Text(FILE01)));
+    inputs.add(new Pair<LongWritable, Text>(new LongWritable(48), new Text(FILE02)));
+
+    final List<Pair<Text, IntWritable>> outputs = new ArrayList<Pair<Text, IntWritable>>();
+    outputs.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(ONE)));
+    outputs.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(ONE)));
+    outputs.add(new Pair<Text, IntWritable>(new Text("Bye"), new IntWritable(ONE)));
+    outputs.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(ONE)));
+    outputs.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(ONE)));
+    outputs.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(ONE)));
+    outputs.add(new Pair<Text, IntWritable>(new Text("Goodbye"), new IntWritable(ONE)));
+    outputs.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(ONE)));
+
+    mapDriver.withAll(inputs).withAllOutput(outputs).runTest(true);
+  }
+
+  @Test
+  public void TestReduceDriver() throws IOException {
+    final List<IntWritable> input1 = new ArrayList<IntWritable>();
+    input1.add(new IntWritable(ONE));
+    
+    final List<IntWritable> input2 = new ArrayList<IntWritable>();
+    input2.add(new IntWritable(ONE));
+    input2.add(new IntWritable(ONE));
+    
+    final List<Pair<Text, List<IntWritable>>> inputs = new ArrayList<Pair<Text, List<IntWritable>>>();
+    inputs.add(new Pair<Text, List<IntWritable>>(new Text("Bye"), input1));
+    inputs.add(new Pair<Text, List<IntWritable>>(new Text("Goodbye"), input1));
+    inputs.add(new Pair<Text, List<IntWritable>>(new Text("Hadoop"), input2));
+    inputs.add(new Pair<Text, List<IntWritable>>(new Text("Hello"), input2));    
+    inputs.add(new Pair<Text, List<IntWritable>>(new Text("World"), input2));
+    
+    reduceDriver.withAll(inputs).withAllOutput(expectedOutput).runTest(true);
+  }
+  
+  @Test
+  public void TestRun() throws IOException {
+    final List<Pair<LongWritable, Text>> inputs = new ArrayList<Pair<LongWritable, Text>>();
+    inputs.add(new Pair<LongWritable, Text>(new LongWritable(21), new Text(FILE01)));
+    inputs.add(new Pair<LongWritable, Text>(new LongWritable(48), new Text(FILE02)));
+
+    driver.withAll(inputs).withAllOutput(expectedOutput).runTest(true);
+  }
+
+  /**
+   * Word count mapper
+   */
+  public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
+    private final IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+
+      String line = value.toString();
+      StringTokenizer tokenizer = new StringTokenizer(line);
+
+      while (tokenizer.hasMoreTokens()) {
+        word.set(tokenizer.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+  /**
+   * Word count reducer
+   */
+  public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
+    public void reduce(Text key, Iterable<IntWritable> values, Context context)
+        throws IOException, InterruptedException {
+
+      int sum = 0;
+
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+
+      context.write(key, new IntWritable(sum));
+    }
+  }
+
+}
\ No newline at end of file