You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by br...@apache.org on 2012/07/29 18:07:37 UTC
svn commit: r1366869 - in /mrunit/trunk/src:
main/java/org/apache/hadoop/mrunit/
main/java/org/apache/hadoop/mrunit/mapreduce/
test/java/org/apache/hadoop/mrunit/mapreduce/
Author: brock
Date: Sun Jul 29 16:07:36 2012
New Revision: 1366869
URL: http://svn.apache.org/viewvc?rev=1366869&view=rev
Log:
MRUNIT-64: Multiple Input Key, Value Pairs should be supported
Added:
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java
Modified:
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java Sun Jul 29 16:07:36 2012
@@ -27,13 +27,11 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.types.Pair;
/**
- * Harness that allows you to test a Mapper instance. You provide the input key
- * and value that should be sent to the Mapper, and outputs you expect to be
+ * Harness that allows you to test a Mapper instance. You provide the input
+ * (k, v)* pairs that should be sent to the Mapper, and outputs you expect to be
* sent by the Mapper to the collector for those inputs. By calling runTest(),
* the harness will deliver the input to the Mapper and will check its outputs
- * against the expected results. This is designed to handle a single (k, v) ->
- * (k, v)* case from the Mapper, representing a single unit test. Multiple input
- * (k, v) pairs should go in separate unit tests.
+ * against the expected results.
*/
public abstract class MapDriverBase<K1, V1, K2, V2> extends
TestDriver<K1, V1, K2, V2> {
@@ -41,7 +39,10 @@ public abstract class MapDriverBase<K1,
public static final Log LOG = LogFactory.getLog(MapDriverBase.class);
protected List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
+
+ @Deprecated
protected K1 inputKey;
+ @Deprecated
protected V1 inputVal;
/**
@@ -49,10 +50,11 @@ public abstract class MapDriverBase<K1,
*
* @param key
*/
+ @Deprecated
public void setInputKey(final K1 key) {
inputKey = copy(key);
}
-
+ @Deprecated
public K1 getInputKey() {
return inputKey;
}
@@ -62,10 +64,11 @@ public abstract class MapDriverBase<K1,
*
* @param val
*/
+ @Deprecated
public void setInputValue(final V1 val) {
inputVal = copy(val);
}
-
+ @Deprecated
public V1 getInputValue() {
return inputVal;
}
@@ -75,8 +78,7 @@ public abstract class MapDriverBase<K1,
*
*/
public void setInput(final K1 key, final V1 val) {
- setInputKey(key);
- setInputValue(val);
+ setInput(new Pair<K1, V1>(key, val));
}
/**
@@ -88,9 +90,63 @@ public abstract class MapDriverBase<K1,
public void setInput(final Pair<K1, V1> inputRecord) {
setInputKey(inputRecord.getFirst());
setInputValue(inputRecord.getSecond());
+
+ clearInput();
+ addInput(inputRecord);
}
/**
+ * Adds an input to send to the mapper
+ *
+ * @param key
+ * @param val
+ */
+ public void addInput(final K1 key, final V1 val) {
+ inputs.add(copyPair(key, val));
+ }
+
+ /**
+ * Adds an input to send to the mapper
+ *
+ * @param input
+ * a (K, V) pair
+ */
+ public void addInput(final Pair<K1, V1> input) {
+ addInput(input.getFirst(), input.getSecond());
+ }
+
+ /**
+ * Adds list of inputs to send to the mapper
+ *
+ * @param inputs
+ * list of (K, V) pairs
+ */
+ public void addAll(final List<Pair<K1, V1>> inputs) {
+ for (Pair<K1, V1> input : inputs) {
+ addInput(input);
+ }
+ }
+
+ /**
+ * Clears the list of inputs to send to the mapper
+ */
+ public void clearInput() {
+ inputs.clear();
+ }
+
+ /**
+ * Adds output (k, v)* pairs we expect from the Mapper
+ *
+ * @param outputRecords
+ * The (k, v)* pairs to add
+ */
+ public void addAllOutput(final List<Pair<K2, V2>> outputRecords) {
+ for (Pair<K2, V2> output : outputRecords) {
+ addOutput(output);
+ }
+ }
+
+ /**
* Adds an output (k, v) pair we expect from the Mapper
*
* @param outputRecord
@@ -145,7 +201,9 @@ public abstract class MapDriverBase<K1,
@Override
public void runTest(final boolean orderMatters) throws IOException {
- LOG.debug("Mapping input (" + inputKey + ", " + inputVal + ")");
+ for (Pair<K1, V1> input : inputs) {
+ LOG.debug("Mapping input (" + input.getFirst() + ", " + input.getSecond() + ")");
+ }
final List<Pair<K2, V2>> outputs = run();
validate(outputs, orderMatters);
validate(counterWrapper);
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Sun Jul 29 16:07:36 2012
@@ -78,6 +78,30 @@ public abstract class MapReduceDriverBas
}
/**
+ * Adds input to send to the mapper
+ *
+ * @param inputs
+ * List of (k, v) pairs to add to the input list
+ */
+ public void addAll(final List<Pair<K1, V1>> inputs) {
+ for (Pair<K1, V1> input : inputs) {
+ addInput(input);
+ }
+ }
+
+ /**
+ * Adds output (k, v)* pairs we expect from the Reducer
+ *
+ * @param outputRecords
+ * List of (k, v) pairs to add
+ */
+ public void addAllOutput(final List<Pair<K3, V3>> outputRecords) {
+ for (Pair<K3, V3> output : outputRecords) {
+ addOutput(output);
+ }
+ }
+
+ /**
* Adds an output (k, v) pair we expect from the Reducer
*
* @param outputRecord
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java Sun Jul 29 16:07:36 2012
@@ -33,14 +33,15 @@ import org.apache.hadoop.mrunit.types.Pa
* sent to the Reducer (as if they came from a Mapper), and outputs you expect
* to be sent by the Reducer to the collector. By calling runTest(), the harness
* will deliver the input to the Reducer and will check its outputs against the
- * expected results. This is designed to handle a single (k, v*) -> (k, v)* case
- * from the Reducer, representing a single unit test. Multiple input (k, v*)
- * sets should go in separate unit tests.
+ * expected results.
*/
public abstract class ReduceDriverBase<K1, V1, K2, V2> extends
TestDriver<K1, V1, K2, V2> {
+ protected List<Pair<K1, List<V1>>> inputs = new ArrayList<Pair<K1, List<V1>>>();
+ @Deprecated
protected K1 inputKey;
+ @Deprecated
private final List<V1> inputValues;
public ReduceDriverBase() {
@@ -48,20 +49,20 @@ public abstract class ReduceDriverBase<K
}
/**
- * Returns a list which when iterated over, returns the same instance of the
- * value each time with different contents similar to how Hadoop currently
- * works with Writables.
+ * Returns a list of values.
*
* @return List of values
*/
+ @Deprecated
public List<V1> getInputValues() {
- return new ValueClassInstanceReuseList<V1>(inputValues, getConfiguration());
+ return inputValues;
}
/**
* Sets the input key to send to the Reducer
*
*/
+ @Deprecated
public void setInputKey(final K1 key) {
inputKey = copy(key);
}
@@ -71,6 +72,7 @@ public abstract class ReduceDriverBase<K
*
* @param val
*/
+ @Deprecated
public void addInputValue(final V1 val) {
inputValues.add(copy(val));
}
@@ -80,6 +82,7 @@ public abstract class ReduceDriverBase<K
*
* @param values
*/
+ @Deprecated
public void setInputValues(final List<V1> values) {
inputValues.clear();
addInputValues(values);
@@ -90,6 +93,7 @@ public abstract class ReduceDriverBase<K
*
* @param values
*/
+ @Deprecated
public void addInputValues(final List<V1> values) {
for (V1 value : values) {
addInputValue(value);
@@ -105,8 +109,70 @@ public abstract class ReduceDriverBase<K
public void setInput(final K1 key, final List<V1> values) {
setInputKey(key);
setInputValues(values);
+
+ clearInput();
+ addInput(key, values);
+ }
+
+ /**
+ * Clears the input to be sent to the Reducer
+ */
+ public void clearInput() {
+ inputs.clear();
}
+
+ /**
+ * Add input (K, V*) to send to the Reducer
+ *
+ * @param key
+ * The key too add
+ * @param values
+ * The list of values to add
+ */
+ public void addInput(final K1 key, final List<V1> values) {
+ List<V1> copyVals = new ArrayList<V1>();
+ for (V1 val : values) {
+ copyVals.add(copy(val));
+ }
+ inputs.add(new Pair<K1, List<V1>>(copy(key),
+ new ValueClassInstanceReuseList<V1>(copyVals, getConfiguration())));
+ }
+
+ /**
+ * Add input (K, V*) to send to the Reducer
+ *
+ * @param input
+ * input pair
+ */
+ public void addInput(final Pair<K1, List<V1>> input) {
+ addInput(input.getFirst(), input.getSecond());
+ }
+
+ /**
+ * Adds input to send to the Reducer
+ *
+ * @param inputs
+ * list of (K, V*) pairs
+ */
+ public void addAll(final List<Pair<K1, List<V1>>> inputs) {
+ for (Pair<K1, List<V1>> input : inputs) {
+ addInput(input);
+ }
+ }
+
+ /**
+ * Adds output (k, v)* pairs we expect from the Reducer
+ *
+ * @param outputRecords
+ * The (k, v)* pairs to add
+ */
+ public void addAllOutput(final List<Pair<K2, V2>> outputRecords) {
+ for (Pair<K2, V2> output : outputRecords) {
+ addOutput(output);
+ }
+ }
+
/**
* Adds an output (k, v) pair we expect from the Reducer
*
@@ -167,10 +233,14 @@ public abstract class ReduceDriverBase<K
@Override
public void runTest(final boolean orderMatters) throws IOException {
- final StringBuilder sb = new StringBuilder();
- formatValueList(inputValues, sb);
-
- LOG.debug("Reducing input (" + inputKey + ", " + sb + ")");
+ if (LOG.isDebugEnabled()) {
+ final StringBuilder sb = new StringBuilder();
+ for (Pair<K1, List<V1>> input : inputs) {
+ formatValueList(input.getSecond(), sb);
+ LOG.debug("Reducing input (" + input.getFirst() + ", " + sb + ")");
+ sb.delete(0, sb.length());
+ }
+ }
List<Pair<K2, V2>> outputs = null;
outputs = run();
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java Sun Jul 29 16:07:36 2012
@@ -39,13 +39,11 @@ import org.apache.hadoop.mrunit.internal
import org.apache.hadoop.mrunit.types.Pair;
/**
- * Harness that allows you to test a Mapper instance. You provide the input key
- * and value that should be sent to the Mapper, and outputs you expect to be
+ * Harness that allows you to test a Mapper instance. You provide the input
+ * (k, v)* pairs that should be sent to the Mapper, and outputs you expect to be
* sent by the Mapper to the collector for those inputs. By calling runTest(),
* the harness will deliver the input to the Mapper and will check its outputs
- * against the expected results. This is designed to handle a single (k, v) ->
- * (k, v)* case from the Mapper, representing a single unit test. Multiple input
- * (k, v) pairs should go in separate unit tests.
+ * against the expected results.
*/
public class MapDriver<K1, V1, K2, V2>
extends MapDriverBase<K1, V1, K2, V2> implements ContextDriver {
@@ -120,6 +118,7 @@ extends MapDriverBase<K1, V1, K2, V2> im
*
* @return this
*/
+ @Deprecated
public MapDriver<K1, V1, K2, V2> withInputKey(final K1 key) {
setInputKey(key);
return this;
@@ -131,6 +130,7 @@ extends MapDriverBase<K1, V1, K2, V2> im
* @param val
* @return this
*/
+ @Deprecated
public MapDriver<K1, V1, K2, V2> withInputValue(final V1 val) {
setInputValue(val);
return this;
@@ -158,6 +158,17 @@ extends MapDriverBase<K1, V1, K2, V2> im
}
/**
+ * Identical to addAll() but returns self for fluent programming style
+ *
+ * @param inputRecords
+ * @return this
+ */
+ public MapDriver<K1, V1, K2, V2> withAll(final List<Pair<K1, V1>> inputRecords) {
+ addAll(inputRecords);
+ return this;
+ }
+
+ /**
* Works like addOutput(), but returns self for fluent style
*
* @param outputRecord
@@ -179,6 +190,18 @@ extends MapDriverBase<K1, V1, K2, V2> im
}
/**
+ * Functions like addAllOutput() but returns self for fluent programming style
+ *
+ * @param outputRecords
+ * @return this
+ */
+ public MapDriver<K1, V1, K2, V2> withAllOutput(
+ final List<Pair<K2, V2>> outputRecords) {
+ addAllOutput(outputRecords);
+ return this;
+ }
+
+ /**
* Identical to setInputFromString, but with a fluent programming style
*
* @param input
@@ -224,15 +247,18 @@ extends MapDriverBase<K1, V1, K2, V2> im
@Override
public List<Pair<K2, V2>> run() throws IOException {
- if (inputKey == null || inputVal == null) {
+ // handle inputKey and inputVal for backwards compatibility
+ if (inputKey != null && inputVal != null) {
+ setInput(inputKey, inputVal);
+ }
+
+ if (inputs == null || inputs.size() == 0) {
throw new IllegalStateException("No input was provided");
}
+
if (myMapper == null) {
throw new IllegalStateException("No Mapper class was provided");
- }
-
- inputs.clear();
- inputs.add(new Pair<K1, V1>(inputKey, inputVal));
+ }
try {
myMapper.run(wrapper.getMockContext());
@@ -262,7 +288,7 @@ extends MapDriverBase<K1, V1, K2, V2> im
/**
* @param mapInputPath
* The Path object which will be given to the mapper
- * @return
+ * @return this object for fluent coding
*/
public MapDriver<K1, V1, K2, V2> withMapInputPath(Path mapInputPath) {
setMapInputPath(mapInputPath);
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Sun Jul 29 16:07:36 2012
@@ -214,6 +214,31 @@ public class MapReduceDriver<K1, V1, K2,
}
/**
+ * Identical to addAll() but returns self for fluent programming style
+ *
+ * @param inputs
+ * List of (k, v) pairs to add
+ * @return this
+ */
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withAll(
+ final List<Pair<K1, V1>> inputs) {
+ addAll(inputs);
+ return this;
+ }
+
+ /**
+ * Works like addAllOutput(), but returns self for fluent style
+ *
+ * @param outputRecords
+ * @return this
+ */
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withAllOutput(
+ final List<Pair<K3, V3>> outputRecords) {
+ addAllOutput(outputRecords);
+ return this;
+ }
+
+ /**
* Works like addOutput(), but returns self for fluent style
*
* @param outputRecord
@@ -296,25 +321,29 @@ public class MapReduceDriver<K1, V1, K2,
final List<Pair<OUTKEY, OUTVAL>> reduceOutputs = new ArrayList<Pair<OUTKEY, OUTVAL>>();
- for (final Pair<K2, List<V2>> input : inputs) {
- final K2 inputKey = input.getFirst();
- final List<V2> inputValues = input.getSecond();
- final StringBuilder sb = new StringBuilder();
- formatValueList(inputValues, sb);
- LOG.debug("Reducing input (" + inputKey.toString() + ", "
- + sb.toString() + ")");
+ if (!inputs.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ final StringBuilder sb = new StringBuilder();
+ for (Pair<K2, List<V2>> input : inputs) {
+ formatValueList(input.getSecond(), sb);
+ LOG.debug("Reducing input (" + input.getFirst() + ", " + sb + ")");
+ sb.delete(0, sb.length());
+ }
+ }
final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
.newReduceDriver(reducer).withCounters(getCounters())
- .withConfiguration(configuration).withInputKey(inputKey)
- .withInputValues(inputValues);
+ .withConfiguration(configuration).withAll(inputs);
+
if (getOutputCopyingOrInputFormatConfiguration() != null) {
reduceDriver
.withOutputCopyingOrInputFormatConfiguration(getOutputCopyingOrInputFormatConfiguration());
}
+
if (outputFormatClass != null) {
reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
}
+
reduceOutputs.addAll(reduceDriver.run());
}
@@ -337,13 +366,10 @@ public class MapReduceDriver<K1, V1, K2,
List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
// run map component
- for (final Pair<K1, V1> input : inputList) {
- LOG.debug("Mapping input " + input.toString() + ")");
-
- mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
- .withCounters(getCounters()).withConfiguration(configuration)
- .withInput(input).withMapInputPath(getMapInputPath()).run());
- }
+ LOG.debug("Starting map phase with mapper: " + myMapper);
+ mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
+ .withCounters(getCounters()).withConfiguration(configuration)
+ .withAll(inputList).withMapInputPath(getMapInputPath()).run());
if (myCombiner != null) {
// User has specified a combiner. Run this and replace the mapper outputs
@@ -379,7 +405,7 @@ public class MapReduceDriver<K1, V1, K2,
/**
* @param mapInputPath
* The Path object which will be given to the mapper
- * @return
+ * @return this
*/
public MapReduceDriver<K1, V1, K2, V2, K3, V3> withMapInputPath(Path mapInputPath) {
setMapInputPath(mapInputPath);
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java Sun Jul 29 16:07:36 2012
@@ -18,10 +18,9 @@
package org.apache.hadoop.mrunit.mapreduce;
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -44,9 +43,7 @@ import org.apache.hadoop.mrunit.types.Pa
* sent to the Reducer (as if they came from a Mapper), and outputs you expect
* to be sent by the Reducer to the collector. By calling runTest(), the harness
* will deliver the input to the Reducer and will check its outputs against the
- * expected results. This is designed to handle a single (k, v*) -> (k, v)* case
- * from the Reducer, representing a single unit test. Multiple input (k, v*)
- * sets should go in separate unit tests.
+ * expected results.
*/
public class ReduceDriver<K1, V1, K2, V2> extends
ReduceDriverBase<K1, V1, K2, V2> implements ContextDriver {
@@ -57,7 +54,6 @@ public class ReduceDriver<K1, V1, K2, V2
private Counters counters;
private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
- private final List<Pair<K1, List<V1>>> inputs = new ArrayList<Pair<K1, List<V1>>>();
private final MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
inputs, mockOutputCreator, this);
@@ -126,6 +122,7 @@ public class ReduceDriver<K1, V1, K2, V2
*
* @return this
*/
+ @Deprecated
public ReduceDriver<K1, V1, K2, V2> withInputKey(final K1 key) {
setInputKey(key);
return this;
@@ -137,6 +134,7 @@ public class ReduceDriver<K1, V1, K2, V2
* @param val
* @return this
*/
+ @Deprecated
public ReduceDriver<K1, V1, K2, V2> withInputValue(final V1 val) {
addInputValue(val);
return this;
@@ -148,6 +146,7 @@ public class ReduceDriver<K1, V1, K2, V2
* @param values
* @return this
*/
+ @Deprecated
public ReduceDriver<K1, V1, K2, V2> withInputValues(final List<V1> values) {
addInputValues(values);
return this;
@@ -168,6 +167,29 @@ public class ReduceDriver<K1, V1, K2, V2
}
/**
+ * Identical to addInput() but returns self for fluent programming style
+ *
+ * @param input
+ * @return this
+ */
+ public ReduceDriver<K1, V1, K2, V2> withInput(final Pair<K1, List<V1>> input) {
+ addInput(input);
+ return this;
+ }
+
+ /**
+ * Identical to addAll() but returns self for fluent programming style
+ *
+ * @param inputs
+ * @return this
+ */
+ public ReduceDriver<K1, V1, K2, V2> withAll(
+ final List<Pair<K1, List<V1>>> inputs) {
+ addAll(inputs);
+ return this;
+ }
+
+ /**
* Works like addOutput(), but returns self for fluent style
*
* @param outputRecord
@@ -193,6 +215,18 @@ public class ReduceDriver<K1, V1, K2, V2
}
/**
+ * Works like addAllOutput(), but returns self for fluent style
+ *
+ * @param outputRecord
+ * @return this
+ */
+ public ReduceDriver<K1, V1, K2, V2> withAllOutput(
+ final List<Pair<K2, V2>> outputRecords) {
+ addAllOutput(outputRecords);
+ return this;
+ }
+
+ /**
* Identical to setInput, but with a fluent programming style
*
* @param input
@@ -238,16 +272,20 @@ public class ReduceDriver<K1, V1, K2, V2
@Override
public List<Pair<K2, V2>> run() throws IOException {
- if (inputKey == null || getInputValues().isEmpty()) {
+ // handle inputKey and inputValues for backwards compatibility
+ if (inputKey != null && !getInputValues().isEmpty()) {
+ clearInput();
+ addInput(inputKey, getInputValues());
+ }
+
+ if (inputs.isEmpty()) {
throw new IllegalStateException("No input was provided");
}
+
if (myReducer == null) {
throw new IllegalStateException("No Reducer class was provided");
}
- inputs.clear();
- inputs.add(new Pair<K1, List<V1>>(inputKey, getInputValues()));
-
try {
myReducer.run(wrapper.getMockContext());
return wrapper.getOutputs();
Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java Sun Jul 29 16:07:36 2012
@@ -179,6 +179,19 @@ public class TestMapDriver {
}
@Test
+ public void testAddAll() throws IOException {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
+ inputs.add(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
+
+ final List<Pair<Text, Text>> outputs = new ArrayList<Pair<Text, Text>>();
+ outputs.add(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
+ outputs.add(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
+
+ driver.withAll(inputs).withAllOutput(outputs).runTest();
+ }
+
+ @Test
public void testSetInput() {
driver.setInput(new Pair<Text, Text>(new Text("foo"), new Text("bar")));
Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java Sun Jul 29 16:07:36 2012
@@ -125,6 +125,20 @@ public class TestMapReduceDriver {
}
@Test
+ public void testAddAll() throws IOException {
+ final List<Pair<Text, LongWritable>> inputs = new ArrayList<Pair<Text, LongWritable>>();
+ inputs.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(FOO_IN_A)));
+ inputs.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(FOO_IN_B)));
+ inputs.add(new Pair<Text, LongWritable>(new Text("bar"), new LongWritable(BAR_IN)));
+
+ final List<Pair<Text, LongWritable>> outputs = new ArrayList<Pair<Text, LongWritable>>();
+ outputs.add(new Pair<Text, LongWritable>(new Text("bar"), new LongWritable(BAR_IN)));
+ outputs.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(FOO_OUT)));
+
+ driver.withAll(inputs).withAllOutput(outputs).runTest();
+ }
+
+ @Test
public void testTestRun3OrderInsensitive() throws IOException {
driver.withInput(new Text("foo"), new LongWritable(FOO_IN_A))
.withInput(new Text("bar"), new LongWritable(BAR_IN))
Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java?rev=1366869&r1=1366868&r2=1366869&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java Sun Jul 29 16:07:36 2012
@@ -234,6 +234,22 @@ public class TestReduceDriver {
}
@Test
+ public void testAddAll() throws IOException {
+ final List<LongWritable> vals = new ArrayList<LongWritable>();
+ vals.add(new LongWritable(IN_A));
+ vals.add(new LongWritable(IN_B));
+
+ final List<Pair<Text, List<LongWritable>>> inputs = new ArrayList<Pair<Text, List<LongWritable>>>();
+ inputs.add(new Pair<Text, List<LongWritable>>(new Text("foo"), vals));
+
+ final List<Pair<Text, LongWritable>> expected = new ArrayList<Pair<Text, LongWritable>>();
+ expected.add(new Pair<Text, LongWritable>(new Text("foo"),
+ new LongWritable(OUT_VAL)));
+
+ driver.withAll(inputs).withAllOutput(expected).runTest();
+ }
+
+ @Test
public void testNoInput() throws IOException {
driver = ReduceDriver.newReduceDriver();
thrown.expectMessage(IllegalStateException.class, "No input was provided");
Added: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java?rev=1366869&view=auto
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java (added)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestWordCount.java Sun Jul 29 16:07:36 2012
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWordCount {
+ private static final String FILE01 = "Hello World Bye World";
+ private static final String FILE02 = "Hello Hadoop Goodbye Hadoop";
+ private static final int ONE = 1;
+ private static final int TWO = 2;
+
+ private Mapper<LongWritable, Text, Text, IntWritable> mapper;
+ private Reducer<Text, IntWritable, Text, IntWritable> reducer;
+ private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> driver;
+ private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
+ private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
+ private List<Pair<Text, IntWritable>> expectedOutput;
+
+ @Before
+ public void setup() {
+ mapper = new Map();
+ reducer = new Reduce();
+ driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
+ mapDriver = MapDriver.newMapDriver(mapper);
+ reduceDriver = ReduceDriver.newReduceDriver(reducer);
+
+ expectedOutput = new ArrayList<Pair<Text, IntWritable>>();
+ expectedOutput.add(new Pair<Text, IntWritable>(new Text("Bye"), new IntWritable(ONE)));
+ expectedOutput.add(new Pair<Text, IntWritable>(new Text("Goodbye"), new IntWritable(ONE)));
+ expectedOutput.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(TWO)));
+ expectedOutput.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(TWO)));
+ expectedOutput.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(TWO)));
+ }
+
+ @Test
+ public void TestMapDriver() throws IOException {
+ final List<Pair<LongWritable, Text>> inputs = new ArrayList<Pair<LongWritable, Text>>();
+ inputs.add(new Pair<LongWritable, Text>(new LongWritable(21), new Text(FILE01)));
+ inputs.add(new Pair<LongWritable, Text>(new LongWritable(48), new Text(FILE02)));
+
+ final List<Pair<Text, IntWritable>> outputs = new ArrayList<Pair<Text, IntWritable>>();
+ outputs.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(ONE)));
+ outputs.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(ONE)));
+ outputs.add(new Pair<Text, IntWritable>(new Text("Bye"), new IntWritable(ONE)));
+ outputs.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(ONE)));
+ outputs.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(ONE)));
+ outputs.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(ONE)));
+ outputs.add(new Pair<Text, IntWritable>(new Text("Goodbye"), new IntWritable(ONE)));
+ outputs.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(ONE)));
+
+ mapDriver.withAll(inputs).withAllOutput(outputs).runTest(true);
+ }
+
+ @Test
+ public void TestReduceDriver() throws IOException {
+ final List<IntWritable> input1 = new ArrayList<IntWritable>();
+ input1.add(new IntWritable(ONE));
+
+ final List<IntWritable> input2 = new ArrayList<IntWritable>();
+ input2.add(new IntWritable(ONE));
+ input2.add(new IntWritable(ONE));
+
+ final List<Pair<Text, List<IntWritable>>> inputs = new ArrayList<Pair<Text, List<IntWritable>>>();
+ inputs.add(new Pair<Text, List<IntWritable>>(new Text("Bye"), input1));
+ inputs.add(new Pair<Text, List<IntWritable>>(new Text("Goodbye"), input1));
+ inputs.add(new Pair<Text, List<IntWritable>>(new Text("Hadoop"), input2));
+ inputs.add(new Pair<Text, List<IntWritable>>(new Text("Hello"), input2));
+ inputs.add(new Pair<Text, List<IntWritable>>(new Text("World"), input2));
+
+ reduceDriver.withAll(inputs).withAllOutput(expectedOutput).runTest(true);
+ }
+
+ @Test
+ public void TestRun() throws IOException {
+ final List<Pair<LongWritable, Text>> inputs = new ArrayList<Pair<LongWritable, Text>>();
+ inputs.add(new Pair<LongWritable, Text>(new LongWritable(21), new Text(FILE01)));
+ inputs.add(new Pair<LongWritable, Text>(new LongWritable(48), new Text(FILE02)));
+
+ driver.withAll(inputs).withAllOutput(expectedOutput).runTest(true);
+ }
+
+ /**
+ * Word count mapper
+ */
+ public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
+ private final IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+
+ String line = value.toString();
+ StringTokenizer tokenizer = new StringTokenizer(line);
+
+ while (tokenizer.hasMoreTokens()) {
+ word.set(tokenizer.nextToken());
+ context.write(word, one);
+ }
+ }
+ }
+
+ /**
+ * Word count reducer
+ */
+ public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
+ public void reduce(Text key, Iterable<IntWritable> values, Context context)
+ throws IOException, InterruptedException {
+
+ int sum = 0;
+
+ for (IntWritable val : values) {
+ sum += val.get();
+ }
+
+ context.write(key, new IntWritable(sum));
+ }
+ }
+
+}
\ No newline at end of file