You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by jd...@apache.org on 2012/04/17 05:37:10 UTC
svn commit: r1326898 - in
/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit: ./
mapreduce/ mock/
Author: jdonofrio
Date: Tue Apr 17 03:37:10 2012
New Revision: 1326898
URL: http://svn.apache.org/viewvc?rev=1326898&view=rev
Log:
MRUNIT-95: use Serialization framework to change all set/add input/output methods to serialize, deserialize to do defensive copying
Modified:
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java Tue Apr 17 03:37:10 2012
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.mrunit;
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
-
import java.io.IOException;
import java.util.List;
@@ -50,7 +48,7 @@ public abstract class MapDriverBase<K1,
* @param key
*/
public void setInputKey(final K1 key) {
- inputKey = returnNonNull(key);
+ inputKey = copy(key);
}
public K1 getInputKey() {
@@ -63,7 +61,7 @@ public abstract class MapDriverBase<K1,
* @param val
*/
public void setInputValue(final V1 val) {
- inputVal = returnNonNull(val);
+ inputVal = copy(val);
}
public V1 getInputValue() {
@@ -97,7 +95,7 @@ public abstract class MapDriverBase<K1,
* The (k, v) pair to add
*/
public void addOutput(final Pair<K2, V2> outputRecord) {
- expectedOutputs.add(returnNonNull(outputRecord));
+ addOutput(outputRecord.getFirst(), outputRecord.getSecond());
}
/**
@@ -105,7 +103,7 @@ public abstract class MapDriverBase<K1,
*
*/
public void addOutput(final K2 key, final V2 val) {
- addOutput(new Pair<K2, V2>(key, val));
+ expectedOutputs.add(copyPair(key, val));
}
/**
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java Tue Apr 17 03:37:10 2012
@@ -310,8 +310,9 @@ public class MapReduceDriver<K1, V1, K2,
for (final Pair<K1, V1> input : inputList) {
LOG.debug("Mapping input " + input.toString() + ")");
- mapOutputs.addAll(MapDriver.newMapDriver(myMapper).withInput(input)
- .withCounters(getCounters()).withConfiguration(configuration).run());
+ mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
+ .withConfiguration(configuration).withCounters(getCounters())
+ .withInput(input).run());
}
if (myCombiner != null) {
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Tue Apr 17 03:37:10 2012
@@ -50,7 +50,7 @@ public abstract class MapReduceDriverBas
public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
- protected List<Pair<K1, V1>> inputList;
+ protected List<Pair<K1, V1>> inputList = new ArrayList<Pair<K1, V1>>();
/** Key group comparator */
protected Comparator<K2> keyGroupComparator;
@@ -58,10 +58,6 @@ public abstract class MapReduceDriverBas
/** Key value order comparator */
protected Comparator<K2> keyValueOrderComparator;
- public MapReduceDriverBase() {
- inputList = new ArrayList<Pair<K1, V1>>();
- }
-
/**
* Adds an input to send to the mapper
*
@@ -69,7 +65,7 @@ public abstract class MapReduceDriverBas
* @param val
*/
public void addInput(final K1 key, final V1 val) {
- inputList.add(new Pair<K1, V1>(key, val));
+ inputList.add(copyPair(key, val));
}
/**
@@ -79,7 +75,7 @@ public abstract class MapReduceDriverBas
* The (k, v) pair to add to the input list.
*/
public void addInput(final Pair<K1, V1> input) {
- inputList.add(returnNonNull(input));
+ addInput(input.getFirst(), input.getSecond());
}
/**
@@ -89,7 +85,7 @@ public abstract class MapReduceDriverBas
* The (k, v) pair to add
*/
public void addOutput(final Pair<K3, V3> outputRecord) {
- expectedOutputs.add(returnNonNull(outputRecord));
+ addOutput(outputRecord.getFirst(), outputRecord.getSecond());
}
/**
@@ -99,7 +95,7 @@ public abstract class MapReduceDriverBas
* @param val
*/
public void addOutput(final K3 key, final V3 val) {
- addOutput(new Pair<K3, V3>(key, val));
+ expectedOutputs.add(copyPair(key, val));
}
/**
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java Tue Apr 17 03:37:10 2012
@@ -163,7 +163,7 @@ public class PipelineMapReduceDriver<K1,
* @param val
*/
public void addInput(final K1 key, final V1 val) {
- inputList.add(new Pair<K1, V1>(key, val));
+ inputList.add(copyPair(key, val));
}
/**
@@ -186,7 +186,7 @@ public class PipelineMapReduceDriver<K1,
* The (k, v) pair to add to the input list.
*/
public void addInput(final Pair<K1, V1> input) {
- inputList.add(returnNonNull(input));
+ addInput(input.getFirst(), input.getSecond());
}
/**
@@ -209,7 +209,7 @@ public class PipelineMapReduceDriver<K1,
* The (k, v) pair to add
*/
public void addOutput(final Pair<K2, V2> outputRecord) {
- expectedOutputs.add(returnNonNull(outputRecord));
+ addOutput(outputRecord.getFirst(), outputRecord.getSecond());
}
/**
@@ -231,7 +231,7 @@ public class PipelineMapReduceDriver<K1,
* @param val
*/
public void addOutput(final K2 key, final V2 val) {
- addOutput(new Pair<K2, V2>(key, val));
+ expectedOutputs.add(copyPair(key, val));
}
/**
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java Tue Apr 17 03:37:10 2012
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.mrunit;
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -64,7 +62,7 @@ public abstract class ReduceDriverBase<K
*
*/
public void setInputKey(final K1 key) {
- inputKey = returnNonNull(key);
+ inputKey = copy(key);
}
/**
@@ -73,7 +71,7 @@ public abstract class ReduceDriverBase<K
* @param val
*/
public void addInputValue(final V1 val) {
- inputValues.add(returnNonNull(val));
+ inputValues.add(copy(val));
}
/**
@@ -92,7 +90,9 @@ public abstract class ReduceDriverBase<K
* @param values
*/
public void addInputValues(final List<V1> values) {
- inputValues.addAll(returnNonNull(values));
+ for (V1 value : values) {
+ addInputValue(value);
+ }
}
/**
@@ -113,7 +113,7 @@ public abstract class ReduceDriverBase<K
* The (k, v) pair to add
*/
public void addOutput(final Pair<K2, V2> outputRecord) {
- expectedOutputs.add(returnNonNull(outputRecord));
+ addOutput(outputRecord.getFirst(), outputRecord.getSecond());
}
/**
@@ -125,7 +125,7 @@ public abstract class ReduceDriverBase<K
* The val part of a (k, v) pair to add
*/
public void addOutput(final K2 key, final V2 val) {
- addOutput(new Pair<K2, V2>(key, val));
+ expectedOutputs.add(copyPair(key, val));
}
/**
@@ -210,7 +210,7 @@ public abstract class ReduceDriverBase<K
@Override
public T next() {
final T next = iterator.next();
- value = (T) serialization.copy(next, value);
+ value = serialization.copy(next, value);
return value;
}
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java Tue Apr 17 03:37:10 2012
@@ -28,8 +28,11 @@ import org.apache.hadoop.io.serializer.S
public class Serialization {
- private final SerializationFactory serializationFactory;
+ private SerializationFactory serializationFactory;
+ /**
+ * @param conf
+ */
public Serialization(Configuration conf) {
serializationFactory = new SerializationFactory(conf);
}
@@ -47,7 +50,7 @@ public class Serialization {
* @return a copy of the orig object
*/
@SuppressWarnings("unchecked")
- public Object copy(final Object orig, final Object copy) {
+ public <T> T copy(final T orig, final T copy) {
if (copy != null && orig.getClass() != copy.getClass()) {
throw new IllegalArgumentException(orig.getClass() + " != "
+ copy.getClass());
@@ -62,7 +65,8 @@ public class Serialization {
.getDeserializer(clazz);
} catch (NullPointerException e) {
throw new IllegalStateException(
- "No applicable class implementing Serialization in conf at io.serializations");
+ "No applicable class implementing Serialization in conf at io.serializations for "
+ + orig.getClass(), e);
}
try {
final DataOutputBuffer outputBuffer = new DataOutputBuffer();
@@ -71,7 +75,7 @@ public class Serialization {
final DataInputBuffer inputBuffer = new DataInputBuffer();
inputBuffer.reset(outputBuffer.getData(), outputBuffer.getLength());
deserializer.open(inputBuffer);
- return deserializer.deserialize(copy);
+ return (T) deserializer.deserialize(copy);
} catch (final IOException e) {
throw new RuntimeException(e);
}
@@ -83,8 +87,21 @@ public class Serialization {
* @param orig
* @return a new copy of the orig object
*/
- public Object copy(final Object orig) {
+ public <T> T copy(final T orig) {
return copy(orig, null);
}
+ /**
+ * Creates a new copy of the orig object
+ *
+ * @param orig
+ * @param conf
+ * new Configuration object to use
+ * @return a new copy of the orig object
+ */
+ public <T> T copyWithConf(final T orig, final Configuration conf) {
+ serializationFactory = new SerializationFactory(conf);
+ return copy(orig);
+ }
+
}
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java Tue Apr 17 03:37:10 2012
@@ -46,11 +46,14 @@ public abstract class TestDriver<K1, V1,
protected CounterWrapper counterWrapper;
+ protected Serialization serialization;
+
public TestDriver() {
expectedOutputs = new ArrayList<Pair<K2, V2>>();
expectedEnumCounters = new ArrayList<Pair<Enum, Long>>();
expectedStringCounters = new ArrayList<Pair<Pair<String, String>, Long>>();
configuration = new Configuration();
+ serialization = new Serialization(configuration);
}
/**
@@ -123,6 +126,23 @@ public abstract class TestDriver<K1, V1,
}
/**
+ * @return The configuration object that will given to the mapper and/or
+ * reducer associated with the driver
+ */
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ /**
+ * @param configuration
+ * The configuration object that will given to the mapper and/or
+ * reducer associated with the driver
+ */
+ public void setConfiguration(final Configuration configuration) {
+ this.configuration = returnNonNull(configuration);
+ }
+
+ /**
* Runs the test but returns the result set instead of validating it (ignores
* any addOutput(), etc calls made before this)
*
@@ -189,6 +209,14 @@ public abstract class TestDriver<K1, V1,
return outList;
}
+ protected <T> T copy(T object) {
+ return serialization.copyWithConf(object, configuration);
+ }
+
+ protected <S, T> Pair<S, T> copyPair(S first, T second) {
+ return new Pair<S, T>(copy(first), copy(second));
+ }
+
/**
* check the outputs against the expected inputs in record
*
@@ -419,21 +447,4 @@ public abstract class TestDriver<K1, V1,
sb.append(")");
}
-
- /**
- * @return The configuration object that will given to the mapper and/or
- * reducer associated with the driver (new API only)
- */
- public Configuration getConfiguration() {
- return configuration;
- }
-
- /**
- * @param configuration
- * The configuration object that will given to the mapper and/or
- * reducer associated with the driver (new API only)
- */
- public void setConfiguration(final Configuration configuration) {
- this.configuration = returnNonNull(configuration);
- }
}
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Tue Apr 17 03:37:10 2012
@@ -308,8 +308,9 @@ public class MapReduceDriver<K1, V1, K2,
for (final Pair<K1, V1> input : inputList) {
LOG.debug("Mapping input " + input.toString() + ")");
- mapOutputs.addAll(MapDriver.newMapDriver(myMapper).withInput(input)
- .withCounters(getCounters()).withConfiguration(configuration).run());
+ mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
+ .withCounters(getCounters()).withConfiguration(configuration)
+ .withInput(input).run());
}
if (myCombiner != null) {
Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java Tue Apr 17 03:37:10 2012
@@ -47,8 +47,8 @@ public class MockOutputCollector<K, V> i
@Override
@SuppressWarnings("unchecked")
public void collect(final K key, final V value) throws IOException {
- collectedOutputs.add(new Pair<K, V>((K) serialization.copy(key),
- (V) serialization.copy(value)));
+ collectedOutputs.add(new Pair<K, V>(serialization.copy(key), serialization
+ .copy(value)));
}
/**