You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/08/21 16:51:40 UTC

svn commit: r806577 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/co...

Author: tomwhite
Date: Fri Aug 21 14:51:39 2009
New Revision: 806577

URL: http://svn.apache.org/viewvc?rev=806577&view=rev
Log:
MAPREDUCE-800. MRUnit should support the new API. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriverBase.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockInputSplit.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContextWrapper.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockOutputCommitter.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockRawKeyValueIterator.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/TestDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=806577&r1=806576&r2=806577&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Aug 21 14:51:39 2009
@@ -62,6 +62,9 @@
     MAPREDUCE-814. Provide a way to configure completed job history files 
     to be on HDFS. (sharad)
 
+    MAPREDUCE-800. MRUnit should support the new API. (Aaron Kimball via
+    tomwhite)
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java?rev=806577&r1=806576&r2=806577&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java Fri Aug 21 14:51:39 2009
@@ -38,15 +38,12 @@
  * single (k, v) -> (k, v)* case from the Mapper, representing a single unit
  * test. Multiple input (k, v) pairs should go in separate unit tests.
  */
-public class MapDriver<K1, V1, K2, V2> extends TestDriver<K1, V1, K2, V2> {
+public class MapDriver<K1, V1, K2, V2> extends MapDriverBase<K1, V1, K2, V2> {
 
   public static final Log LOG = LogFactory.getLog(MapDriver.class);
 
   private Mapper<K1, V1, K2, V2> myMapper;
 
-  private K1 inputKey;
-  private V1 inputVal;
-
   public MapDriver(final Mapper<K1, V1, K2, V2> m) {
     myMapper = m;
   }
@@ -54,7 +51,6 @@
   public MapDriver() {
   }
 
-
   /**
    * Set the Mapper instance to use with this test driver
    *
@@ -78,18 +74,6 @@
   }
 
   /**
-   * Sets the input key to send to the mapper
-   *
-   */
-  public void setInputKey(K1 key) {
-    inputKey = key;
-  }
-
-  public K1 getInputKey() {
-    return inputKey;
-  }
-
-  /**
    * Identical to setInputKey() but with fluent programming style
    *
    * @return this
@@ -100,19 +84,6 @@
   }
 
   /**
-   * Sets the input value to send to the mapper
-   *
-   * @param val
-   */
-  public void setInputValue(V1 val) {
-    inputVal = val;
-  }
-
-  public V1 getInputValue() {
-    return inputVal;
-  }
-
-  /**
    * Identical to setInputValue() but with fluent programming style
    *
    * @param val
@@ -124,15 +95,6 @@
   }
 
   /**
-   * Sets the input to send to the mapper
-   *
-   */
-  public void setInput(K1 key, V1 val) {
-    setInputKey(key);
-    setInputValue(val);
-  }
-
-  /**
    * Identical to setInput() but returns self for fluent programming style
    *
    * @return this
@@ -143,21 +105,6 @@
   }
 
   /**
-   * Sets the input to send to the mapper
-   *
-   * @param inputRecord
-   *          a (key, val) pair
-   */
-  public void setInput(Pair<K1, V1> inputRecord) {
-    if (null != inputRecord) {
-      setInputKey(inputRecord.getFirst());
-      setInputValue(inputRecord.getSecond());
-    } else {
-      throw new IllegalArgumentException("null inputRecord in setInput()");
-    }
-  }
-
-  /**
    * Identical to setInput() but returns self for fluent programming style
    *
    * @param inputRecord
@@ -169,20 +116,6 @@
   }
 
   /**
-   * Adds an output (k, v) pair we expect from the Mapper
-   *
-   * @param outputRecord
-   *          The (k, v) pair to add
-   */
-  public void addOutput(Pair<K2, V2> outputRecord) {
-    if (null != outputRecord) {
-      expectedOutputs.add(outputRecord);
-    } else {
-      throw new IllegalArgumentException("Tried to add null outputRecord");
-    }
-  }
-
-  /**
    * Works like addOutput(), but returns self for fluent style
    *
    * @param outputRecord
@@ -194,14 +127,6 @@
   }
 
   /**
-   * Adds a (k, v) pair we expect as output from the mapper
-   *
-   */
-  public void addOutput(K2 key, V2 val) {
-    addOutput(new Pair<K2, V2>(key, val));
-  }
-
-  /**
    * Functions like addOutput() but returns self for fluent programming
    * style
    *
@@ -213,30 +138,6 @@
   }
 
   /**
-   * Expects an input of the form "key \t val" Forces the Mapper input types
-   * to Text.
-   *
-   * @param input
-   *          A string of the form "key \t val".
-   */
-  public void setInputFromString(String input) {
-    if (null == input) {
-      throw new IllegalArgumentException("null input given to setInputFromString");
-    } else {
-      Pair<Text, Text> inputPair = parseTabbedPair(input);
-      if (null != inputPair) {
-        // I know this is not type-safe, but I don't know a better way to do
-        // this.
-        setInputKey((K1) inputPair.getFirst());
-        setInputValue((V1) inputPair.getSecond());
-      } else {
-        throw new IllegalArgumentException(
-            "Could not parse input pair in setInputFromString");
-      }
-    }
-  }
-
-  /**
    * Identical to setInputFromString, but with a fluent programming style
    *
    * @param input
@@ -249,28 +150,6 @@
   }
 
   /**
-   * Expects an input of the form "key \t val" Forces the Mapper output types
-   * to Text.
-   *
-   * @param output
-   *          A string of the form "key \t val". Trims any whitespace.
-   */
-  public void addOutputFromString(String output) {
-    if (null == output) {
-      throw new IllegalArgumentException("null input given to setOutput");
-    } else {
-      Pair<Text, Text> outputPair = parseTabbedPair(output);
-      if (null != outputPair) {
-        // I know this is not type-safe, but I don't know a better way to do
-        // this.
-        addOutput((Pair<K2, V2>) outputPair);
-      } else {
-        throw new IllegalArgumentException("Could not parse output pair in setOutput");
-      }
-    }
-  }
-
-  /**
    * Identical to addOutputFromString, but with a fluent programming style
    *
    * @param output
@@ -294,33 +173,6 @@
   }
 
   @Override
-  public void runTest() throws RuntimeException {
-    String inputKeyStr = "(null)";
-    String inputValStr = "(null)";
-
-    if (null != inputKey) {
-      inputKeyStr = inputKey.toString();
-    }
-
-    if (null != inputVal) {
-      inputValStr = inputVal.toString();
-    }
-
-    LOG.debug("Mapping input (" + inputKeyStr + ", " + inputValStr + ")");
-
-    List<Pair<K2, V2>> outputs = null;
-
-    try {
-      outputs = run();
-      validate(outputs);
-    } catch (IOException ioe) {
-      LOG.error("IOException in mapper: " + ioe.toString());
-      LOG.debug("Setting success to false based on IOException");
-      throw new RuntimeException();
-    }
-  }
-
-  @Override
   public String toString() {
     return "MapDriver (" + myMapper + ")";
   }

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriverBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriverBase.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriverBase.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriverBase.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit;
+
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mock.MockOutputCollector;
+import org.apache.hadoop.mrunit.mock.MockReporter;
+import org.apache.hadoop.mrunit.types.Pair;
+
+/**
+ * Harness that allows you to test a Mapper instance. You provide the input
+ * key and value that should be sent to the Mapper, and outputs you expect to
+ * be sent by the Mapper to the collector for those inputs. By calling
+ * runTest(), the harness will deliver the input to the Mapper and will check
+ * its outputs against the expected results. This is designed to handle a
+ * single (k, v) -> (k, v)* case from the Mapper, representing a single unit
+ * test. Multiple input (k, v) pairs should go in separate unit tests.
+ */
+public abstract class MapDriverBase<K1, V1, K2, V2> extends TestDriver<K1, V1, K2, V2> {
+
+  public static final Log LOG = LogFactory.getLog(MapDriverBase.class);
+
+  protected K1 inputKey;
+  protected V1 inputVal;
+
+
+  /**
+   * Sets the input key to send to the mapper
+   *
+   */
+  public void setInputKey(K1 key) {
+    inputKey = key;
+  }
+
+  public K1 getInputKey() {
+    return inputKey;
+  }
+
+  /**
+   * Sets the input value to send to the mapper
+   *
+   * @param val
+   */
+  public void setInputValue(V1 val) {
+    inputVal = val;
+  }
+
+  public V1 getInputValue() {
+    return inputVal;
+  }
+
+  /**
+   * Sets the input to send to the mapper
+   *
+   */
+  public void setInput(K1 key, V1 val) {
+    setInputKey(key);
+    setInputValue(val);
+  }
+
+  /**
+   * Sets the input to send to the mapper
+   *
+   * @param inputRecord
+   *          a (key, val) pair
+   */
+  public void setInput(Pair<K1, V1> inputRecord) {
+    if (null != inputRecord) {
+      setInputKey(inputRecord.getFirst());
+      setInputValue(inputRecord.getSecond());
+    } else {
+      throw new IllegalArgumentException("null inputRecord in setInput()");
+    }
+  }
+
+  /**
+   * Adds an output (k, v) pair we expect from the Mapper
+   *
+   * @param outputRecord
+   *          The (k, v) pair to add
+   */
+  public void addOutput(Pair<K2, V2> outputRecord) {
+    if (null != outputRecord) {
+      expectedOutputs.add(outputRecord);
+    } else {
+      throw new IllegalArgumentException("Tried to add null outputRecord");
+    }
+  }
+
+  /**
+   * Adds a (k, v) pair we expect as output from the mapper
+   *
+   */
+  public void addOutput(K2 key, V2 val) {
+    addOutput(new Pair<K2, V2>(key, val));
+  }
+
+  /**
+   * Expects an input of the form "key \t val" Forces the Mapper input types
+   * to Text.
+   *
+   * @param input
+   *          A string of the form "key \t val".
+   */
+  public void setInputFromString(String input) {
+    if (null == input) {
+      throw new IllegalArgumentException("null input given to setInputFromString");
+    } else {
+      Pair<Text, Text> inputPair = parseTabbedPair(input);
+      if (null != inputPair) {
+        // I know this is not type-safe, but I don't know a better way to do
+        // this.
+        setInputKey((K1) inputPair.getFirst());
+        setInputValue((V1) inputPair.getSecond());
+      } else {
+        throw new IllegalArgumentException(
+            "Could not parse input pair in setInputFromString");
+      }
+    }
+  }
+
+  /**
+   * Expects an input of the form "key \t val" Forces the Mapper output types
+   * to Text.
+   *
+   * @param output
+   *          A string of the form "key \t val". Trims any whitespace.
+   */
+  public void addOutputFromString(String output) {
+    if (null == output) {
+      throw new IllegalArgumentException("null input given to setOutput");
+    } else {
+      Pair<Text, Text> outputPair = parseTabbedPair(output);
+      if (null != outputPair) {
+        // I know this is not type-safe, but I don't know a better way to do
+        // this.
+        addOutput((Pair<K2, V2>) outputPair);
+      } else {
+        throw new IllegalArgumentException("Could not parse output pair in setOutput");
+      }
+    }
+  }
+
+  public abstract List<Pair<K2, V2>> run() throws IOException;
+
+  @Override
+  public void runTest() throws RuntimeException {
+    String inputKeyStr = "(null)";
+    String inputValStr = "(null)";
+
+    if (null != inputKey) {
+      inputKeyStr = inputKey.toString();
+    }
+
+    if (null != inputVal) {
+      inputValStr = inputVal.toString();
+    }
+
+    LOG.debug("Mapping input (" + inputKeyStr + ", " + inputValStr + ")");
+
+    List<Pair<K2, V2>> outputs = null;
+
+    try {
+      outputs = run();
+      validate(outputs);
+    } catch (IOException ioe) {
+      LOG.error("IOException in mapper: " + ioe.toString());
+      LOG.debug("Setting success to false based on IOException");
+      throw new RuntimeException();
+    }
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java?rev=806577&r1=806576&r2=806577&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java Fri Aug 21 14:51:39 2009
@@ -48,7 +48,7 @@
  * the Mapper and before the Reducer.
  */
 public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
-    extends TestDriver<K1, V1, K3, V3> {
+    extends MapReduceDriverBase<K1, V1, K2, V2, K3, V3> {
 
   public static final Log LOG = LogFactory.getLog(MapReduceDriver.class);
 
@@ -56,13 +56,10 @@
   private Reducer<K2, V2, K3, V3> myReducer;
   private Reducer<K2, V2, K2, V2> myCombiner;
 
-  private List<Pair<K1, V1>> inputList;
-
   public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
                          final Reducer<K2, V2, K3, V3> r) {
     myMapper = m;
     myReducer = r;
-    inputList = new ArrayList<Pair<K1, V1>>();
   }
 
   public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
@@ -71,11 +68,9 @@
     myMapper = m;
     myReducer = r;
     myCombiner = c;
-    inputList = new ArrayList<Pair<K1, V1>>();
   }
 
   public MapReduceDriver() {
-    inputList = new ArrayList<Pair<K1, V1>>();
   }
 
   /** Set the Mapper instance to use with this test driver
@@ -151,15 +146,6 @@
   }
 
   /**
-   * Adds an input to send to the mapper
-   * @param key
-   * @param val
-   */
-  public void addInput(K1 key, V1 val) {
-    inputList.add(new Pair<K1, V1>(key, val));
-  }
-
-  /**
    * Identical to addInput() but returns self for fluent programming style
    * @param key
    * @param val
@@ -171,18 +157,6 @@
   }
 
   /**
-   * Adds an input to send to the Mapper
-   * @param input The (k, v) pair to add to the input list.
-   */
-  public void addInput(Pair<K1, V1> input) {
-    if (null == input) {
-      throw new IllegalArgumentException("Null input in addInput()");
-    }
-
-    inputList.add(input);
-  }
-
-  /**
    * Identical to addInput() but returns self for fluent programming style
    * @param input The (k, v) pair to add
    * @return this
@@ -194,18 +168,6 @@
   }
 
   /**
-   * Adds an output (k, v) pair we expect from the Reducer
-   * @param outputRecord The (k, v) pair to add
-   */
-  public void addOutput(Pair<K3, V3> outputRecord) {
-    if (null != outputRecord) {
-      expectedOutputs.add(outputRecord);
-    } else {
-      throw new IllegalArgumentException("Tried to add null outputRecord");
-    }
-  }
-
-  /**
    * Works like addOutput(), but returns self for fluent style
    * @param outputRecord
    * @return this
@@ -217,15 +179,6 @@
   }
 
   /**
-   * Adds a (k, v) pair we expect as output from the Reducer
-   * @param key
-   * @param val
-   */
-  public void addOutput(K3 key, V3 val) {
-    addOutput(new Pair<K3, V3>(key, val));
-  }
-
-  /**
    * Functions like addOutput() but returns self for fluent programming style
    * @param key
    * @param val
@@ -237,26 +190,6 @@
   }
 
   /**
-   * Expects an input of the form "key \t val"
-   * Forces the Mapper input types to Text.
-   * @param input A string of the form "key \t val". Trims any whitespace.
-   */
-  public void addInputFromString(String input) {
-    if (null == input) {
-      throw new IllegalArgumentException("null input given to setInput");
-    } else {
-      Pair<Text, Text> inputPair = parseTabbedPair(input);
-      if (null != inputPair) {
-        // I know this is not type-safe, but I don't
-        // know a better way to do this.
-        addInput((Pair<K1, V1>) inputPair);
-      } else {
-        throw new IllegalArgumentException("Could not parse input pair in addInput");
-      }
-    }
-  }
-
-  /**
    * Identical to addInputFromString, but with a fluent programming style
    * @param input A string of the form "key \t val". Trims any whitespace.
    * @return this
@@ -267,27 +200,6 @@
   }
 
   /**
-   * Expects an input of the form "key \t val"
-   * Forces the Reducer output types to Text.
-   * @param output A string of the form "key \t val". Trims any whitespace.
-   */
-  public void addOutputFromString(String output) {
-    if (null == output) {
-      throw new IllegalArgumentException("null input given to setOutput");
-    } else {
-      Pair<Text, Text> outputPair = parseTabbedPair(output);
-      if (null != outputPair) {
-        // I know this is not type-safe,
-        // but I don't know a better way to do this.
-        addOutput((Pair<K3, V3>) outputPair);
-      } else {
-        throw new IllegalArgumentException(
-            "Could not parse output pair in setOutput");
-      }
-    }
-  }
-
-  /**
    * Identical to addOutputFromString, but with a fluent programming style
    * @param output A string of the form "key \t val". Trims any whitespace.
    * @return this
@@ -347,58 +259,7 @@
   }
 
   @Override
-  public void runTest() throws RuntimeException {
-    List<Pair<K3, V3>> reduceOutputs = null;
-    boolean succeeded;
-
-    try {
-      reduceOutputs = run();
-      validate(reduceOutputs);
-    } catch (IOException ioe) {
-      LOG.error("IOException: " + ioe.toString());
-      LOG.debug("Setting success to false based on IOException");
-      throw new RuntimeException();
-    }
-  }
-
-  /** Take the outputs from the Mapper, combine all values for the
-   *  same key, and sort them by key.
-   * @param mapOutputs An unordered list of (key, val) pairs from the mapper
-   * @return the sorted list of (key, list(val))'s to present to the reducer
-   */
-  List<Pair<K2, List<V2>>> shuffle(List<Pair<K2, V2>> mapOutputs) {
-    HashMap<K2, List<V2>> reducerInputs = new HashMap<K2, List<V2>>();
-
-    // step 1: condense all values with the same key into a list.
-    for (Pair<K2, V2> mapOutput : mapOutputs) {
-      List<V2> valuesForKey = reducerInputs.get(mapOutput.getFirst());
-
-      if (null == valuesForKey) {
-        // this is the first (k, v) pair for this key. Add it to the list.
-        valuesForKey = new ArrayList<V2>();
-        valuesForKey.add(mapOutput.getSecond());
-        reducerInputs.put(mapOutput.getFirst(), valuesForKey);
-      } else {
-        // add this value to the existing list for this key
-        valuesForKey.add(mapOutput.getSecond());
-      }
-    }
-
-    // build a list out of these (k, list(v)) pairs
-    List<Pair<K2, List<V2>>> finalInputs = new ArrayList<Pair<K2, List<V2>>>();
-    Set<Map.Entry<K2, List<V2>>> entries = reducerInputs.entrySet();
-    for (Map.Entry<K2, List<V2>> entry : entries) {
-      K2 key = entry.getKey();
-      List<V2> vals = entry.getValue();
-      finalInputs.add(new Pair<K2, List<V2>>(key, vals));
-    }
-
-    // and then sort the output list by key
-    if (finalInputs.size() > 0) {
-      Collections.sort(finalInputs,
-              finalInputs.get(0).new FirstElemComparator());
-    }
-
-    return finalInputs;
+  public String toString() {
+    return "MapReduceDriver (" + myMapper + ", " + myReducer + ")";
   }
 }

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.types.Pair;
+
+/**
+ * Harness that allows you to test a Mapper and a Reducer instance together
+ * You provide the input key and value that should be sent to the Mapper, and
+ * outputs you expect to be sent by the Reducer to the collector for those
+ * inputs. By calling runTest(), the harness will deliver the input to the
+ * Mapper, feed the intermediate results to the Reducer (without checking
+ * them), and will check the Reducer's outputs against the expected results.
+ * This is designed to handle a single (k, v)* -> (k, v)* case from the
+ * Mapper/Reducer pair, representing a single unit test.
+ */
+public abstract class MapReduceDriverBase<K1, V1, K2, V2, K3, V3>
+    extends TestDriver<K1, V1, K3, V3> {
+
+  public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
+
+  protected List<Pair<K1, V1>> inputList;
+
+  public MapReduceDriverBase() {
+    inputList = new ArrayList<Pair<K1, V1>>();
+  }
+
+  /**
+   * Adds an input to send to the mapper
+   * @param key
+   * @param val
+   */
+  public void addInput(K1 key, V1 val) {
+    inputList.add(new Pair<K1, V1>(key, val));
+  }
+
+  /**
+   * Adds an input to send to the Mapper
+   * @param input The (k, v) pair to add to the input list.
+   */
+  public void addInput(Pair<K1, V1> input) {
+    if (null == input) {
+      throw new IllegalArgumentException("Null input in addInput()");
+    }
+
+    inputList.add(input);
+  }
+
+  /**
+   * Adds an output (k, v) pair we expect from the Reducer
+   * @param outputRecord The (k, v) pair to add
+   */
+  public void addOutput(Pair<K3, V3> outputRecord) {
+    if (null != outputRecord) {
+      expectedOutputs.add(outputRecord);
+    } else {
+      throw new IllegalArgumentException("Tried to add null outputRecord");
+    }
+  }
+
+  /**
+   * Adds a (k, v) pair we expect as output from the Reducer
+   * @param key
+   * @param val
+   */
+  public void addOutput(K3 key, V3 val) {
+    addOutput(new Pair<K3, V3>(key, val));
+  }
+
+  /**
+   * Expects an input of the form "key \t val"
+   * Forces the Mapper input types to Text.
+   * @param input A string of the form "key \t val". Trims any whitespace.
+   */
+  public void addInputFromString(String input) {
+    if (null == input) {
+      throw new IllegalArgumentException("null input given to setInput");
+    } else {
+      Pair<Text, Text> inputPair = parseTabbedPair(input);
+      if (null != inputPair) {
+        // I know this is not type-safe, but I don't
+        // know a better way to do this.
+        addInput((Pair<K1, V1>) inputPair);
+      } else {
+        throw new IllegalArgumentException("Could not parse input pair in addInput");
+      }
+    }
+  }
+
+  /**
+   * Expects an input of the form "key \t val"
+   * Forces the Reducer output types to Text.
+   * @param output A string of the form "key \t val". Trims any whitespace.
+   */
+  public void addOutputFromString(String output) {
+    if (null == output) {
+      throw new IllegalArgumentException("null input given to setOutput");
+    } else {
+      Pair<Text, Text> outputPair = parseTabbedPair(output);
+      if (null != outputPair) {
+        // I know this is not type-safe,
+        // but I don't know a better way to do this.
+        addOutput((Pair<K3, V3>) outputPair);
+      } else {
+        throw new IllegalArgumentException(
+            "Could not parse output pair in setOutput");
+      }
+    }
+  }
+
+  public abstract List<Pair<K3, V3>> run() throws IOException;
+
+  @Override
+  public void runTest() throws RuntimeException {
+    List<Pair<K3, V3>> reduceOutputs = null;
+    boolean succeeded;
+
+    try {
+      reduceOutputs = run();
+      validate(reduceOutputs);
+    } catch (IOException ioe) {
+      LOG.error("IOException: " + ioe.toString());
+      LOG.debug("Setting success to false based on IOException");
+      throw new RuntimeException();
+    }
+  }
+
+  /** Take the outputs from the Mapper, combine all values for the
+   *  same key, and sort them by key.
+   * @param mapOutputs An unordered list of (key, val) pairs from the mapper
+   * @return the sorted list of (key, list(val))'s to present to the reducer
+   */
+  public List<Pair<K2, List<V2>>> shuffle(List<Pair<K2, V2>> mapOutputs) {
+    HashMap<K2, List<V2>> reducerInputs = new HashMap<K2, List<V2>>();
+
+    // step 1: condense all values with the same key into a list.
+    for (Pair<K2, V2> mapOutput : mapOutputs) {
+      List<V2> valuesForKey = reducerInputs.get(mapOutput.getFirst());
+
+      if (null == valuesForKey) {
+        // this is the first (k, v) pair for this key. Add it to the list.
+        valuesForKey = new ArrayList<V2>();
+        valuesForKey.add(mapOutput.getSecond());
+        reducerInputs.put(mapOutput.getFirst(), valuesForKey);
+      } else {
+        // add this value to the existing list for this key
+        valuesForKey.add(mapOutput.getSecond());
+      }
+    }
+
+    // build a list out of these (k, list(v)) pairs
+    List<Pair<K2, List<V2>>> finalInputs = new ArrayList<Pair<K2, List<V2>>>();
+    Set<Map.Entry<K2, List<V2>>> entries = reducerInputs.entrySet();
+    for (Map.Entry<K2, List<V2>> entry : entries) {
+      K2 key = entry.getKey();
+      List<V2> vals = entry.getValue();
+      finalInputs.add(new Pair<K2, List<V2>>(key, vals));
+    }
+
+    // and then sort the output list by key
+    if (finalInputs.size() > 0) {
+      Collections.sort(finalInputs,
+              finalInputs.get(0).new FirstElemComparator());
+    }
+
+    return finalInputs;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java?rev=806577&r1=806576&r2=806577&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java Fri Aug 21 14:51:39 2009
@@ -40,22 +40,17 @@
  * (k, v*) -> (k, v)* case from the Reducer, representing a single unit test.
  * Multiple input (k, v*) sets should go in separate unit tests.
  */
-public class ReduceDriver<K1, V1, K2, V2> extends TestDriver<K1, V1, K2, V2> {
+public class ReduceDriver<K1, V1, K2, V2> extends ReduceDriverBase<K1, V1, K2, V2> {
 
   public static final Log LOG = LogFactory.getLog(ReduceDriver.class);
 
   private Reducer<K1, V1, K2, V2> myReducer;
 
-  private K1 inputKey;
-  private List<V1> inputValues;
-
   public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
     myReducer = r;
-    inputValues = new ArrayList<V1>();
   }
 
   public ReduceDriver() {
-    inputValues = new ArrayList<V1>();
   }
 
   /**
@@ -85,14 +80,6 @@
   }
 
   /**
-   * Sets the input key to send to the Reducer
-   *
-   */
-  public void setInputKey(K1 key) {
-    inputKey = key;
-  }
-
-  /**
    * Identical to setInputKey() but with fluent programming style
    *
    * @return this
@@ -103,15 +90,6 @@
   }
 
   /**
-   * adds an input value to send to the reducer
-   *
-   * @param val
-   */
-  public void addInputValue(V1 val) {
-    inputValues.add(val);
-  }
-
-  /**
    * Identical to addInputValue() but with fluent programming style
    *
    * @param val
@@ -123,25 +101,6 @@
   }
 
   /**
-   * Sets the input values to send to the reducer; overwrites existing ones
-   *
-   * @param values
-   */
-  public void setInputValues(List<V1> values) {
-    inputValues.clear();
-    inputValues.addAll(values);
-  }
-
-  /**
-   * Adds a set of input values to send to the reducer
-   *
-   * @param values
-   */
-  public void addInputValues(List<V1> values) {
-    inputValues.addAll(values);
-  }
-
-  /**
    * Identical to addInputValues() but with fluent programming style
    *
    * @param values
@@ -153,16 +112,6 @@
   }
 
   /**
-   * Sets the input to send to the reducer
-   *
-   * @param values
-   */
-  public void setInput(K1 key, List<V1> values) {
-    setInputKey(key);
-    setInputValues(values);
-  }
-
-  /**
    * Identical to setInput() but returns self for fluent programming style
    *
    * @return this
@@ -173,20 +122,6 @@
   }
 
   /**
-   * Adds an output (k, v) pair we expect from the Reducer
-   *
-   * @param outputRecord
-   *          The (k, v) pair to add
-   */
-  public void addOutput(Pair<K2, V2> outputRecord) {
-    if (null != outputRecord) {
-      expectedOutputs.add(outputRecord);
-    } else {
-      throw new IllegalArgumentException("Tried to add null outputRecord");
-    }
-  }
-
-  /**
    * Works like addOutput(), but returns self for fluent style
    *
    * @param outputRecord
@@ -198,16 +133,6 @@
   }
 
   /**
-   * Adds an output (k, v) pair we expect from the Reducer
-   *
-   * @param key The key part of a (k, v) pair to add
-   * @param val The val part of a (k, v) pair to add
-   */
-  public void addOutput(K2 key, V2 val) {
-    addOutput(new Pair<K2, V2>(key, val));
-  }
-
-  /**
    * Works like addOutput(), but returns self for fluent style
    *
    * @param key The key part of a (k, v) pair to add
@@ -220,31 +145,6 @@
   }
 
   /**
-   * Expects an input of the form "key \t val, val, val..." Forces the Reducer
-   * input types to Text.
-   *
-   * @param input
-   *          A string of the form "key \t val,val,val". Trims any whitespace.
-   */
-  public void setInputFromString(String input) {
-    if (null == input) {
-      throw new IllegalArgumentException("null input given to setInputFromString");
-    } else {
-      Pair<Text, Text> inputPair = parseTabbedPair(input);
-      if (null != inputPair) {
-        // I know this is not type-safe, but I don't know a better way to do
-        // this.
-        setInputKey((K1) inputPair.getFirst());
-        setInputValues((List<V1>) parseCommaDelimitedList(inputPair.getSecond()
-                .toString()));
-      } else {
-        throw new IllegalArgumentException(
-            "Could not parse input pair in setInputFromString");
-      }
-    }
-  }
-
-  /**
    * Identical to setInput, but with a fluent programming style
    *
    * @param input
@@ -257,28 +157,6 @@
   }
 
   /**
-   * Expects an input of the form "key \t val" Forces the Reducer output types
-   * to Text.
-   *
-   * @param output
-   *          A string of the form "key \t val". Trims any whitespace.
-   */
-  public void addOutputFromString(String output) {
-    if (null == output) {
-      throw new IllegalArgumentException("null input given to setOutput");
-    } else {
-      Pair<Text, Text> outputPair = parseTabbedPair(output);
-      if (null != outputPair) {
-        // I know this is not type-safe, but I don't know a better way to do
-        // this.
-        addOutput((Pair<K2, V2>) outputPair);
-      } else {
-        throw new IllegalArgumentException("Could not parse output pair in setOutput");
-      }
-    }
-  }
-
-  /**
    * Identical to addOutput, but with a fluent programming style
    *
    * @param output
@@ -292,7 +170,6 @@
 
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
-
     MockOutputCollector<K2, V2> outputCollector =
       new MockOutputCollector<K2, V2>();
     MockReporter reporter = new MockReporter(MockReporter.ReporterType.Reducer);
@@ -305,39 +182,8 @@
   }
 
   @Override
-  public void runTest() throws RuntimeException {
-
-    String inputKeyStr = "(null)";
-
-    if (null != inputKey) {
-      inputKeyStr = inputKey.toString();
-    }
-
-    StringBuilder sb = new StringBuilder();
-    formatValueList(inputValues, sb);
-
-    LOG.debug("Reducing input (" + inputKeyStr + ", " + sb.toString() + ")");
-
-    List<Pair<K2, V2>> outputs = null;
-    try {
-      outputs = run();
-      validate(outputs);
-    } catch (IOException ioe) {
-      LOG.error("IOException in reducer: " + ioe.toString());
-      LOG.debug("Setting success to false based on IOException");
-      throw new RuntimeException();
-    }
-  }
-
-  @Override
   public String toString() {
-    String reducerStr = "null";
-
-    if (null != myReducer) {
-      reducerStr = myReducer.toString();
-    }
-
-    return "ReduceDriver (" + reducerStr + ")";
+    return "ReduceDriver (" + myReducer + ")";
   }
 }
 

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriverBase.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriverBase.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriverBase.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mock.MockOutputCollector;
+import org.apache.hadoop.mrunit.mock.MockReporter;
+import org.apache.hadoop.mrunit.types.Pair;
+
+/**
+ * Harness that allows you to test a Reducer instance. You provide a key and a
+ * set of intermediate values for that key that represent inputs that should
+ * be sent to the Reducer (as if they came from a Mapper), and outputs you
+ * expect to be sent by the Reducer to the collector. By calling runTest(),
+ * the harness will deliver the input to the Reducer and will check its
+ * outputs against the expected results. This is designed to handle a single
+ * (k, v*) -> (k, v)* case from the Reducer, representing a single unit test.
+ * Multiple input (k, v*) sets should go in separate unit tests.
+ */
+public abstract class ReduceDriverBase<K1, V1, K2, V2> extends TestDriver<K1, V1, K2, V2> {
+
+  protected K1 inputKey;
+  protected List<V1> inputValues;
+
+  public ReduceDriverBase() {
+    inputValues = new ArrayList<V1>();
+  }
+
+  /**
+   * Sets the input key to send to the Reducer
+   *
+   */
+  public void setInputKey(K1 key) {
+    inputKey = key;
+  }
+
+  /**
+   * adds an input value to send to the reducer
+   *
+   * @param val
+   */
+  public void addInputValue(V1 val) {
+    inputValues.add(val);
+  }
+
+  /**
+   * Sets the input values to send to the reducer; overwrites existing ones
+   *
+   * @param values
+   */
+  public void setInputValues(List<V1> values) {
+    inputValues.clear();
+    inputValues.addAll(values);
+  }
+
+  /**
+   * Adds a set of input values to send to the reducer
+   *
+   * @param values
+   */
+  public void addInputValues(List<V1> values) {
+    inputValues.addAll(values);
+  }
+
+  /**
+   * Sets the input to send to the reducer
+   *
+   * @param values
+   */
+  public void setInput(K1 key, List<V1> values) {
+    setInputKey(key);
+    setInputValues(values);
+  }
+
+  /**
+   * Adds an output (k, v) pair we expect from the Reducer
+   *
+   * @param outputRecord
+   *          The (k, v) pair to add
+   */
+  public void addOutput(Pair<K2, V2> outputRecord) {
+    if (null != outputRecord) {
+      expectedOutputs.add(outputRecord);
+    } else {
+      throw new IllegalArgumentException("Tried to add null outputRecord");
+    }
+  }
+
+  /**
+   * Adds an output (k, v) pair we expect from the Reducer
+   *
+   * @param key The key part of a (k, v) pair to add
+   * @param val The val part of a (k, v) pair to add
+   */
+  public void addOutput(K2 key, V2 val) {
+    addOutput(new Pair<K2, V2>(key, val));
+  }
+
+  /**
+   * Expects an input of the form "key \t val, val, val..." Forces the Reducer
+   * input types to Text.
+   *
+   * @param input
+   *          A string of the form "key \t val,val,val". Trims any whitespace.
+   */
+  public void setInputFromString(String input) {
+    if (null == input) {
+      throw new IllegalArgumentException("null input given to setInputFromString");
+    } else {
+      Pair<Text, Text> inputPair = parseTabbedPair(input);
+      if (null != inputPair) {
+        // I know this is not type-safe, but I don't know a better way to do
+        // this.
+        setInputKey((K1) inputPair.getFirst());
+        setInputValues((List<V1>) parseCommaDelimitedList(inputPair.getSecond()
+                .toString()));
+      } else {
+        throw new IllegalArgumentException(
+            "Could not parse input pair in setInputFromString");
+      }
+    }
+  }
+
+  /**
+   * Expects an input of the form "key \t val" Forces the Reducer output types
+   * to Text.
+   *
+   * @param output
+   *          A string of the form "key \t val". Trims any whitespace.
+   */
+  public void addOutputFromString(String output) {
+    if (null == output) {
+      throw new IllegalArgumentException("null input given to setOutput");
+    } else {
+      Pair<Text, Text> outputPair = parseTabbedPair(output);
+      if (null != outputPair) {
+        // I know this is not type-safe, but I don't know a better way to do
+        // this.
+        addOutput((Pair<K2, V2>) outputPair);
+      } else {
+        throw new IllegalArgumentException("Could not parse output pair in setOutput");
+      }
+    }
+  }
+
+  public abstract List<Pair<K2, V2>> run() throws IOException;
+
+  @Override
+  public void runTest() throws RuntimeException {
+
+    String inputKeyStr = "(null)";
+
+    if (null != inputKey) {
+      inputKeyStr = inputKey.toString();
+    }
+
+    StringBuilder sb = new StringBuilder();
+    formatValueList(inputValues, sb);
+
+    LOG.debug("Reducing input (" + inputKeyStr + ", " + sb.toString() + ")");
+
+    List<Pair<K2, V2>> outputs = null;
+    try {
+      outputs = run();
+      validate(outputs);
+    } catch (IOException ioe) {
+      LOG.error("IOException in reducer: " + ioe.toString());
+      LOG.debug("Setting success to false based on IOException");
+      throw new RuntimeException();
+    }
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/TestDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/TestDriver.java?rev=806577&r1=806576&r2=806577&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/TestDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/TestDriver.java Fri Aug 21 14:51:39 2009
@@ -70,7 +70,7 @@
    * Split "key \t val" into Pair(Text(key), Text(val))
    * @param tabSeparatedPair
    */
-  static Pair<Text, Text> parseTabbedPair(String tabSeparatedPair) {
+  public static Pair<Text, Text> parseTabbedPair(String tabSeparatedPair) {
 
     String key, val;
 

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mrunit.MapDriverBase;
+import org.apache.hadoop.mrunit.mapreduce.mock.MockMapContextWrapper;
+import org.apache.hadoop.mrunit.types.Pair;
+
+/**
+ * Harness that allows you to test a Mapper instance. You provide the input
+ * key and value that should be sent to the Mapper, and outputs you expect to
+ * be sent by the Mapper to the collector for those inputs. By calling
+ * runTest(), the harness will deliver the input to the Mapper and will check
+ * its outputs against the expected results. This is designed to handle a
+ * single (k, v) -> (k, v)* case from the Mapper, representing a single unit
+ * test. Multiple input (k, v) pairs should go in separate unit tests.
+ */
+public class MapDriver<K1, V1, K2, V2> extends MapDriverBase<K1, V1, K2, V2> {
+
+  public static final Log LOG = LogFactory.getLog(MapDriver.class);
+
+  private Mapper<K1, V1, K2, V2> myMapper;
+
+  public MapDriver(final Mapper<K1, V1, K2, V2> m) {
+    myMapper = m;
+  }
+
+  public MapDriver() {
+  }
+
+
+  /**
+   * Set the Mapper instance to use with this test driver
+   *
+   * @param m the Mapper instance to use
+   */
+  public void setMapper(Mapper<K1, V1, K2, V2> m) {
+    myMapper = m;
+  }
+
+  /** Sets the Mapper instance to use and returns self for fluent style */
+  public MapDriver<K1, V1, K2, V2> withMapper(Mapper<K1, V1, K2, V2> m) {
+    setMapper(m);
+    return this;
+  }
+
+  /**
+   * @return the Mapper object being used by this test
+   */
+  public Mapper<K1, V1, K2, V2> getMapper() {
+    return myMapper;
+  }
+
+  /**
+   * Identical to setInputKey() but with fluent programming style
+   *
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withInputKey(K1 key) {
+    setInputKey(key);
+    return this;
+  }
+
+  /**
+   * Identical to setInputValue() but with fluent programming style
+   *
+   * @param val
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withInputValue(V1 val) {
+    setInputValue(val);
+    return this;
+  }
+
+  /**
+   * Identical to setInput() but returns self for fluent programming style
+   *
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withInput(K1 key, V1 val) {
+    setInput(key, val);
+    return this;
+  }
+
+  /**
+   * Identical to setInput() but returns self for fluent programming style
+   *
+   * @param inputRecord
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withInput(Pair<K1, V1> inputRecord) {
+    setInput(inputRecord);
+    return this;
+  }
+
+  /**
+   * Works like addOutput(), but returns self for fluent style
+   *
+   * @param outputRecord
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withOutput(Pair<K2, V2> outputRecord) {
+    addOutput(outputRecord);
+    return this;
+  }
+
+  /**
+   * Functions like addOutput() but returns self for fluent programming
+   * style
+   *
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withOutput(K2 key, V2 val) {
+    addOutput(key, val);
+    return this;
+  }
+
+  /**
+   * Identical to setInputFromString, but with a fluent programming style
+   *
+   * @param input
+   *          A string of the form "key \t val". Trims any whitespace.
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withInputFromString(String input) {
+    setInputFromString(input);
+    return this;
+  }
+
+  /**
+   * Identical to addOutputFromString, but with a fluent programming style
+   *
+   * @param output
+   *          A string of the form "key \t val". Trims any whitespace.
+   * @return this
+   */
+  public MapDriver<K1, V1, K2, V2> withOutputFromString(String output) {
+    addOutputFromString(output);
+    return this;
+  }
+
+  @Override
+  public List<Pair<K2, V2>> run() throws IOException {
+    List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
+    inputs.add(new Pair<K1, V1>(inputKey, inputVal));
+
+    try {
+      MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper();
+      MockMapContextWrapper<K1, V1, K2, V2>.MockMapContext context =
+          wrapper.getMockContext(inputs);
+
+      myMapper.run(context);
+      return context.getOutputs();
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MapDriver (0.20+) (" + myMapper + ")";
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.MapReduceDriverBase;
+import org.apache.hadoop.mrunit.types.Pair;
+
+/**
+ * Harness that allows you to test a Mapper and a Reducer instance together
+ * You provide the input key and value that should be sent to the Mapper, and
+ * outputs you expect to be sent by the Reducer to the collector for those
+ * inputs. By calling runTest(), the harness will deliver the input to the
+ * Mapper, feed the intermediate results to the Reducer (without checking
+ * them), and will check the Reducer's outputs against the expected results.
+ * This is designed to handle a single (k, v)* -> (k, v)* case from the
+ * Mapper/Reducer pair, representing a single unit test.
+ */
+public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
+    extends MapReduceDriverBase<K1, V1, K2, V2, K3, V3> {
+
+  public static final Log LOG = LogFactory.getLog(MapReduceDriver.class);
+
+  private Mapper<K1, V1, K2, V2> myMapper;
+  private Reducer<K2, V2, K3, V3> myReducer;
+
+  public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
+                         final Reducer<K2, V2, K3, V3> r) {
+    myMapper = m;
+    myReducer = r;
+  }
+
+  public MapReduceDriver() {
+  }
+
+  /** Set the Mapper instance to use with this test driver
+   * @param m the Mapper instance to use */
+  public void setMapper(Mapper<K1, V1, K2, V2> m) {
+    myMapper = m;
+  }
+
+  /** Sets the Mapper instance to use and returns self for fluent style */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withMapper(
+          Mapper<K1, V1, K2, V2> m) {
+    setMapper(m);
+    return this;
+  }
+
+  /**
+   * @return the Mapper object being used by this test
+   */
+  public Mapper<K1, V1, K2, V2> getMapper() {
+    return myMapper;
+  }
+
+  /**
+   * Sets the reducer object to use for this test
+   * @param r The reducer object to use
+   */
+  public void setReducer(Reducer<K2, V2, K3, V3> r) {
+    myReducer = r;
+  }
+
+  /**
+   * Identical to setReducer(), but with fluent programming style
+   * @param r The Reducer to use
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withReducer(
+          Reducer<K2, V2, K3, V3> r) {
+    setReducer(r);
+    return this;
+  }
+
+  /**
+   * @return the Reducer object being used for this test
+   */
+  public Reducer<K2, V2, K3, V3> getReducer() {
+    return myReducer;
+  }
+
+  /**
+   * Identical to addInput() but returns self for fluent programming style
+   * @param key
+   * @param val
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withInput(K1 key, V1 val) {
+    addInput(key, val);
+    return this;
+  }
+
+  /**
+   * Identical to addInput() but returns self for fluent programming style
+   * @param input The (k, v) pair to add
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withInput(
+      Pair<K1, V1> input) {
+    addInput(input);
+    return this;
+  }
+
+  /**
+   * Works like addOutput(), but returns self for fluent style
+   * @param outputRecord
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutput(
+          Pair<K3, V3> outputRecord) {
+    addOutput(outputRecord);
+    return this;
+  }
+
+  /**
+   * Functions like addOutput() but returns self for fluent programming style
+   * @param key
+   * @param val
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutput(K3 key, V3 val) {
+    addOutput(key, val);
+    return this;
+  }
+
+  /**
+   * Identical to addInputFromString, but with a fluent programming style
+   * @param input A string of the form "key \t val". Trims any whitespace.
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withInputFromString(String input) {
+    addInputFromString(input);
+    return this;
+  }
+
+  /**
+   * Identical to addOutputFromString, but with a fluent programming style
+   * @param output A string of the form "key \t val". Trims any whitespace.
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFromString(String output) {
+    addOutputFromString(output);
+    return this;
+  }
+
+  public List<Pair<K3, V3>> run() throws IOException {
+
+    List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
+
+    // run map component
+    for (Pair<K1, V1> input : inputList) {
+      LOG.debug("Mapping input " + input.toString() + ")");
+
+      mapOutputs.addAll(new MapDriver<K1, V1, K2, V2>(myMapper).withInput(
+              input).run());
+    }
+
+    List<Pair<K2, List<V2>>> reduceInputs = shuffle(mapOutputs);
+    List<Pair<K3, V3>> reduceOutputs = new ArrayList<Pair<K3, V3>>();
+
+    for (Pair<K2, List<V2>> input : reduceInputs) {
+      K2 inputKey = input.getFirst();
+      List<V2> inputValues = input.getSecond();
+      StringBuilder sb = new StringBuilder();
+      formatValueList(inputValues, sb);
+      LOG.debug("Reducing input (" + inputKey.toString() + ", "
+          + sb.toString() + ")");
+
+      reduceOutputs.addAll(new ReduceDriver<K2, V2, K3, V3>(myReducer)
+              .withInputKey(inputKey).withInputValues(inputValues).run());
+    }
+
+    return reduceOutputs;
+  }
+
+  @Override
+  public String toString() {
+    return "MapReduceDriver (0.20+) (" + myMapper + ", " + myReducer + ")";
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.ReduceDriverBase;
+import org.apache.hadoop.mrunit.mapreduce.mock.MockReduceContextWrapper;
+import org.apache.hadoop.mrunit.types.Pair;
+
+/**
+ * Harness that allows you to test a Reducer instance. You provide a key and a
+ * set of intermediate values for that key that represent inputs that should
+ * be sent to the Reducer (as if they came from a Mapper), and outputs you
+ * expect to be sent by the Reducer to the collector. By calling runTest(),
+ * the harness will deliver the input to the Reducer and will check its
+ * outputs against the expected results. This is designed to handle a single
+ * (k, v*) -> (k, v)* case from the Reducer, representing a single unit test.
+ * Multiple input (k, v*) sets should go in separate unit tests.
+ */
+public class ReduceDriver<K1, V1, K2, V2> extends ReduceDriverBase<K1, V1, K2, V2> {
+
+  public static final Log LOG = LogFactory.getLog(ReduceDriver.class);
+
+  private Reducer<K1, V1, K2, V2> myReducer;
+
+  public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
+    myReducer = r;
+  }
+
+  public ReduceDriver() {
+  }
+
+  /**
+   * Sets the reducer object to use for this test
+   *
+   * @param r
+   *          The reducer object to use
+   */
+  public void setReducer(Reducer<K1, V1, K2, V2> r) {
+    myReducer = r;
+  }
+
+  /**
+   * Identical to setReducer(), but with fluent programming style
+   *
+   * @param r
+   *          The Reducer to use
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withReducer(Reducer<K1, V1, K2, V2> r) {
+    setReducer(r);
+    return this;
+  }
+
+  public Reducer<K1, V1, K2, V2> getReducer() {
+    return myReducer;
+  }
+
+  /**
+   * Identical to setInputKey() but with fluent programming style
+   *
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withInputKey(K1 key) {
+    setInputKey(key);
+    return this;
+  }
+
+  /**
+   * Identical to addInputValue() but with fluent programming style
+   *
+   * @param val
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withInputValue(V1 val) {
+    addInputValue(val);
+    return this;
+  }
+
+  /**
+   * Identical to addInputValues() but with fluent programming style
+   *
+   * @param values
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withInputValues(List<V1> values) {
+    addInputValues(values);
+    return this;
+  }
+
+  /**
+   * Identical to setInput() but returns self for fluent programming style
+   *
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withInput(K1 key, List<V1> values) {
+    setInput(key, values);
+    return this;
+  }
+
+  /**
+   * Works like addOutput(), but returns self for fluent style
+   *
+   * @param outputRecord
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withOutput(Pair<K2, V2> outputRecord) {
+    addOutput(outputRecord);
+    return this;
+  }
+
+  /**
+   * Works like addOutput(), but returns self for fluent style
+   *
+   * @param key The key part of a (k, v) pair to add
+   * @param val The val part of a (k, v) pair to add
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withOutput(K2 key, V2 val) {
+    addOutput(key, val);
+    return this;
+  }
+
+  /**
+   * Identical to setInput, but with a fluent programming style
+   *
+   * @param input
+   *          A string of the form "key \t val". Trims any whitespace.
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withInputFromString(String input) {
+    setInputFromString(input);
+    return this;
+  }
+
+  /**
+   * Identical to addOutput, but with a fluent programming style
+   *
+   * @param output
+   *          A string of the form "key \t val". Trims any whitespace.
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withOutputFromString(String output) {
+    addOutputFromString(output);
+    return this;
+  }
+
+  @Override
+  public List<Pair<K2, V2>> run() throws IOException {
+    List<Pair<K1, List<V1>>> inputs = new ArrayList<Pair<K1, List<V1>>>();
+    inputs.add(new Pair<K1, List<V1>>(inputKey, inputValues));
+
+    try {
+      MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper();
+      MockReduceContextWrapper<K1, V1, K2, V2>.MockReduceContext context =
+          wrapper.getMockContext(inputs);
+
+      myReducer.run(context);
+      return context.getOutputs();
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "ReduceDriver (0.20+) (" + myReducer + ")";
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockInputSplit.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockInputSplit.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockInputSplit.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce.mock;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.IOException;
+
+/**
+ * Mock implementation of InputSplit that does nothing.
+ */
+public class MockInputSplit extends FileSplit {
+
+  private static final Path MOCK_PATH = new Path("somefile");
+
+  public MockInputSplit() {
+    super(MOCK_PATH, 0, 0, (String []) null);
+  }
+
+  public String toString() {
+    return "MockInputSplit";
+  }
+
+  /**
+   * Return the path object represented by this as a FileSplit.
+   */
+  public static Path getMockPath() {
+    return MOCK_PATH;
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContextWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContextWrapper.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContextWrapper.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContextWrapper.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce.mock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.mrunit.mock.MockOutputCollector;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * o.a.h.mapreduce.Mapper.map() expects to use a Mapper.Context
+ * object as a parameter. We want to override the functionality
+ * of a lot of Context to have it send the results back to us, etc.
+ * But since Mapper.Context is an inner class of Mapper, we need to
+ * put any subclasses of Mapper.Context in a subclass of Mapper.
+ *
+ * This wrapper class exists for that purpose.
+ */
+public class MockMapContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+    extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  public static final Log LOG = LogFactory.getLog(MockMapContextWrapper.class);
+
+  /**
+   * Mock context instance that provides input to and receives output from
+   * the Mapper instance under test.
+   */
+  public class MockMapContext extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
+
+    private Iterator<Pair<KEYIN, VALUEIN>> inputIter;
+    private Pair<KEYIN, VALUEIN> curInput;
+    private MockOutputCollector<KEYOUT, VALUEOUT> output;
+
+    public MockMapContext(final List<Pair<KEYIN, VALUEIN>> in)
+        throws IOException, InterruptedException {
+
+      super(new Configuration(),
+            new TaskAttemptID("mrunit-jt", 0, TaskType.MAP, 0, 0),
+            null, null, new MockOutputCommitter(), null, null);
+      this.inputIter = in.iterator();
+      this.output = new MockOutputCollector<KEYOUT, VALUEOUT>();
+    }
+
+    @Override
+    public InputSplit getInputSplit() {
+      return new MockInputSplit();
+    }
+
+    @Override
+    public KEYIN getCurrentKey() {
+      return curInput.getFirst();
+    }
+
+    @Override
+    public VALUEIN getCurrentValue() {
+      return curInput.getSecond();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException {
+      if (this.inputIter.hasNext()) {
+        this.curInput = this.inputIter.next();
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    public void write(KEYOUT key, VALUEOUT value) throws IOException {
+      output.collect(key, value);
+    }
+
+    /** This method does nothing in the mock version. */
+    public Counter getCounter(Enum<?> counterName) {
+      return null;
+    }
+
+    @Override
+    /** This method does nothing in the mock version. */
+    public Counter getCounter(String groupName, String counterName) {
+      return null;
+    }
+
+    @Override
+    /** This method does nothing in the mock version. */
+    public void progress() {
+    }
+
+    @Override
+    /** This method does nothing in the mock version. */
+    public void setStatus(String status) {
+    }
+
+    /**
+     * @return the outputs from the MockOutputCollector back to
+     * the test harness.
+     */
+    public List<Pair<KEYOUT, VALUEOUT>> getOutputs() {
+      return output.getOutputs();
+    }
+  }
+
+  public MockMapContext getMockContext(List<Pair<KEYIN, VALUEIN>> inputs)
+      throws IOException, InterruptedException {
+    return new MockMapContext(inputs);
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockOutputCommitter.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockOutputCommitter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockOutputCommitter.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce.mock;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Mock implementation of OutputCommitter that does nothing.
+ */
+public class MockOutputCommitter extends OutputCommitter {
+
+  public void setupJob(JobContext jobContext) {
+  }
+
+  public void cleanupJob(JobContext jobContext) {
+  }
+
+  public void setupTask(TaskAttemptContext taskContext) {
+  }
+
+  public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+    return false;
+  }
+
+  public void commitTask(TaskAttemptContext taskContext) {
+  }
+
+  public void abortTask(TaskAttemptContext taskContext) {
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockRawKeyValueIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockRawKeyValueIterator.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockRawKeyValueIterator.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockRawKeyValueIterator.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce.mock;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+
+import java.io.IOException;
+
+/**
+ * Mock implementation of RawKeyValueIterator that does nothing.
+ */
+public class MockRawKeyValueIterator implements RawKeyValueIterator {
+  public DataInputBuffer getKey() {
+    return null;
+  }
+
+  public DataInputBuffer getValue() {
+    return null;
+  }
+
+  public boolean next() {
+    return false;
+  }
+
+  public void close() {
+  }
+
+  public Progress getProgress() {
+    return null;
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce.mock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.mrunit.mock.MockOutputCollector;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * o.a.h.mapreduce.Reducer.reduce() expects to use a Reducer.Context
+ * object as a parameter. We want to override the functionality
+ * of a lot of Context to have it send the results back to us, etc.
+ * But since Reducer.Context is an inner class of Reducer, we need to
+ * put any subclasses of Reducer.Context in a subclass of Reducer.
+ *
+ * This wrapper class exists for that purpose.
+ */
+public class MockReduceContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+    extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  public static final Log LOG = LogFactory.getLog(MockReduceContextWrapper.class);
+
+  /**
+   * Mock context instance that provides input to and receives output from
+   * the Mapper instance under test.
+   */
+  public class MockReduceContext extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
+
+    // The iterator over the input key, list(val).
+    private Iterator<Pair<KEYIN, List<VALUEIN>>> inputIter;
+
+    // The current key and list of values.
+    private KEYIN curKey;
+    private InspectableIterable curValueIterable;
+
+    private MockOutputCollector<KEYOUT, VALUEOUT> output;
+
+    public MockReduceContext(final List<Pair<KEYIN, List<VALUEIN>>> in)
+        throws IOException, InterruptedException {
+
+      super(new Configuration(),
+            new TaskAttemptID("mrunit-jt", 0, TaskType.REDUCE, 0, 0),
+            new MockRawKeyValueIterator(), null, null,
+            new MockOutputCommitter(), null, null,
+            (Class) Text.class, (Class) Text.class);
+      this.inputIter = in.iterator();
+      this.output = new MockOutputCollector<KEYOUT, VALUEOUT>();
+    }
+
+
+    /**
+     * A private iterable/iterator implementation that wraps around the 
+     * underlying iterable/iterator used by the input value list. This
+     * memorizes the last value we saw so that we can return it in getCurrentValue().
+     */
+    private class InspectableIterable implements Iterable<VALUEIN> {
+      private Iterable<VALUEIN> base;
+      private VALUEIN lastVal;
+
+      public InspectableIterable(final Iterable<VALUEIN> baseCollection) {
+        this.base = baseCollection;
+      }
+
+      public Iterator<VALUEIN> iterator() {
+        return new InspectableIterator(this.base.iterator());
+      }
+
+      public VALUEIN getLastVal() {
+        return lastVal;
+      }
+
+      private class InspectableIterator 
+          extends ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.ValueIterator
+          implements Iterator<VALUEIN> {
+        private Iterator<VALUEIN> iter;
+        public InspectableIterator(final Iterator<VALUEIN> baseIter) {
+          iter = baseIter;
+        }
+
+        public VALUEIN next() {
+          InspectableIterable.this.lastVal = iter.next();
+          return InspectableIterable.this.lastVal;
+        }
+
+        public boolean hasNext() {
+          return iter.hasNext();
+        }
+
+        public void remove() {
+          iter.remove();
+        }
+      }
+    }
+
+    @Override
+    public boolean nextKey() {
+      if (inputIter.hasNext()) {
+        // Advance to the next key and list of values
+        Pair<KEYIN, List<VALUEIN>> p = inputIter.next();
+        curKey = p.getFirst();
+
+        // Reset the value iterator
+        curValueIterable = new InspectableIterable(p.getSecond());
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public boolean nextKeyValue() {
+      return nextKey();
+    }
+
+    @Override
+    public KEYIN getCurrentKey() {
+      return curKey;
+    }
+
+    @Override
+    public VALUEIN getCurrentValue() {
+      return curValueIterable.getLastVal();
+    }
+
+    @Override
+    public Iterable<VALUEIN> getValues() {
+      return curValueIterable;
+    }
+
+    public void write(KEYOUT key, VALUEOUT value) throws IOException {
+      output.collect(key, value);
+    }
+
+    /** This method does nothing in the mock version. */
+    public Counter getCounter(Enum<?> counterName) {
+      return null;
+    }
+
+    @Override
+    /** This method does nothing in the mock version. */
+    public Counter getCounter(String groupName, String counterName) {
+      return null;
+    }
+
+    @Override
+    /** This method does nothing in the mock version. */
+    public void progress() {
+    }
+
+    @Override
+    /** This method does nothing in the mock version. */
+    public void setStatus(String status) {
+    }
+
+    /**
+     * @return the outputs from the MockOutputCollector back to
+     * the test harness.
+     */
+    public List<Pair<KEYOUT, VALUEOUT>> getOutputs() {
+      return output.getOutputs();
+    }
+  }
+
+  public MockReduceContext getMockContext(List<Pair<KEYIN, List<VALUEIN>>> inputs)
+      throws IOException, InterruptedException {
+    return new MockReduceContext(inputs);
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java?rev=806577&r1=806576&r2=806577&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java Fri Aug 21 14:51:39 2009
@@ -43,6 +43,7 @@
     suite.addTestSuite(TestExample.class);
 
     suite.addTest(org.apache.hadoop.mrunit.types.AllTests.suite());
+    suite.addTest(org.apache.hadoop.mrunit.mapreduce.AllTests.suite());
     return suite;
   }
 

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java?rev=806577&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java Fri Aug 21 14:51:39 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * All tests for the new 0.20+ mapreduce API versions of the test harness.
+ */
+public final class AllTests  {
+
+  private AllTests() { }
+
+  public static Test suite() {
+    TestSuite suite = new TestSuite("Test for org.apache.hadoop.mrunit.mapreduce");
+
+    suite.addTestSuite(TestMapDriver.class);
+    suite.addTestSuite(TestReduceDriver.class);
+    suite.addTestSuite(TestMapReduceDriver.class);
+
+    return suite;
+  }
+
+}
+