You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by jd...@apache.org on 2012/04/17 05:37:10 UTC

svn commit: r1326898 - in /incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit: ./ mapreduce/ mock/

Author: jdonofrio
Date: Tue Apr 17 03:37:10 2012
New Revision: 1326898

URL: http://svn.apache.org/viewvc?rev=1326898&view=rev
Log:
MRUNIT-95: use Serialization framework to change all set/add input/output methods to serialize, deserialize to do defensive copying

Modified:
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
    incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java Tue Apr 17 03:37:10 2012
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.mrunit;
 
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -50,7 +48,7 @@ public abstract class MapDriverBase<K1, 
    * @param key
    */
   public void setInputKey(final K1 key) {
-    inputKey = returnNonNull(key);
+    inputKey = copy(key);
   }
 
   public K1 getInputKey() {
@@ -63,7 +61,7 @@ public abstract class MapDriverBase<K1, 
    * @param val
    */
   public void setInputValue(final V1 val) {
-    inputVal = returnNonNull(val);
+    inputVal = copy(val);
   }
 
   public V1 getInputValue() {
@@ -97,7 +95,7 @@ public abstract class MapDriverBase<K1, 
    *          The (k, v) pair to add
    */
   public void addOutput(final Pair<K2, V2> outputRecord) {
-    expectedOutputs.add(returnNonNull(outputRecord));
+    addOutput(outputRecord.getFirst(), outputRecord.getSecond());
   }
 
   /**
@@ -105,7 +103,7 @@ public abstract class MapDriverBase<K1, 
    * 
    */
   public void addOutput(final K2 key, final V2 val) {
-    addOutput(new Pair<K2, V2>(key, val));
+    expectedOutputs.add(copyPair(key, val));
   }
 
   /**

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java Tue Apr 17 03:37:10 2012
@@ -310,8 +310,9 @@ public class MapReduceDriver<K1, V1, K2,
     for (final Pair<K1, V1> input : inputList) {
       LOG.debug("Mapping input " + input.toString() + ")");
 
-      mapOutputs.addAll(MapDriver.newMapDriver(myMapper).withInput(input)
-          .withCounters(getCounters()).withConfiguration(configuration).run());
+      mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
+          .withConfiguration(configuration).withCounters(getCounters())
+          .withInput(input).run());
     }
 
     if (myCombiner != null) {

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Tue Apr 17 03:37:10 2012
@@ -50,7 +50,7 @@ public abstract class MapReduceDriverBas
 
   public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
 
-  protected List<Pair<K1, V1>> inputList;
+  protected List<Pair<K1, V1>> inputList = new ArrayList<Pair<K1, V1>>();
 
   /** Key group comparator */
   protected Comparator<K2> keyGroupComparator;
@@ -58,10 +58,6 @@ public abstract class MapReduceDriverBas
   /** Key value order comparator */
   protected Comparator<K2> keyValueOrderComparator;
 
-  public MapReduceDriverBase() {
-    inputList = new ArrayList<Pair<K1, V1>>();
-  }
-
   /**
    * Adds an input to send to the mapper
    * 
@@ -69,7 +65,7 @@ public abstract class MapReduceDriverBas
    * @param val
    */
   public void addInput(final K1 key, final V1 val) {
-    inputList.add(new Pair<K1, V1>(key, val));
+    inputList.add(copyPair(key, val));
   }
 
   /**
@@ -79,7 +75,7 @@ public abstract class MapReduceDriverBas
    *          The (k, v) pair to add to the input list.
    */
   public void addInput(final Pair<K1, V1> input) {
-    inputList.add(returnNonNull(input));
+    addInput(input.getFirst(), input.getSecond());
   }
 
   /**
@@ -89,7 +85,7 @@ public abstract class MapReduceDriverBas
    *          The (k, v) pair to add
    */
   public void addOutput(final Pair<K3, V3> outputRecord) {
-    expectedOutputs.add(returnNonNull(outputRecord));
+    addOutput(outputRecord.getFirst(), outputRecord.getSecond());
   }
 
   /**
@@ -99,7 +95,7 @@ public abstract class MapReduceDriverBas
    * @param val
    */
   public void addOutput(final K3 key, final V3 val) {
-    addOutput(new Pair<K3, V3>(key, val));
+    expectedOutputs.add(copyPair(key, val));
   }
 
   /**

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java Tue Apr 17 03:37:10 2012
@@ -163,7 +163,7 @@ public class PipelineMapReduceDriver<K1,
    * @param val
    */
   public void addInput(final K1 key, final V1 val) {
-    inputList.add(new Pair<K1, V1>(key, val));
+    inputList.add(copyPair(key, val));
   }
 
   /**
@@ -186,7 +186,7 @@ public class PipelineMapReduceDriver<K1,
    *          The (k, v) pair to add to the input list.
    */
   public void addInput(final Pair<K1, V1> input) {
-    inputList.add(returnNonNull(input));
+    addInput(input.getFirst(), input.getSecond());
   }
 
   /**
@@ -209,7 +209,7 @@ public class PipelineMapReduceDriver<K1,
    *          The (k, v) pair to add
    */
   public void addOutput(final Pair<K2, V2> outputRecord) {
-    expectedOutputs.add(returnNonNull(outputRecord));
+    addOutput(outputRecord.getFirst(), outputRecord.getSecond());
   }
 
   /**
@@ -231,7 +231,7 @@ public class PipelineMapReduceDriver<K1,
    * @param val
    */
   public void addOutput(final K2 key, final V2 val) {
-    addOutput(new Pair<K2, V2>(key, val));
+    expectedOutputs.add(copyPair(key, val));
   }
 
   /**

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriverBase.java Tue Apr 17 03:37:10 2012
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.mrunit;
 
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -64,7 +62,7 @@ public abstract class ReduceDriverBase<K
    * 
    */
   public void setInputKey(final K1 key) {
-    inputKey = returnNonNull(key);
+    inputKey = copy(key);
   }
 
   /**
@@ -73,7 +71,7 @@ public abstract class ReduceDriverBase<K
    * @param val
    */
   public void addInputValue(final V1 val) {
-    inputValues.add(returnNonNull(val));
+    inputValues.add(copy(val));
   }
 
   /**
@@ -92,7 +90,9 @@ public abstract class ReduceDriverBase<K
    * @param values
    */
   public void addInputValues(final List<V1> values) {
-    inputValues.addAll(returnNonNull(values));
+    for (V1 value : values) {
+      addInputValue(value);
+    }
   }
 
   /**
@@ -113,7 +113,7 @@ public abstract class ReduceDriverBase<K
    *          The (k, v) pair to add
    */
   public void addOutput(final Pair<K2, V2> outputRecord) {
-    expectedOutputs.add(returnNonNull(outputRecord));
+    addOutput(outputRecord.getFirst(), outputRecord.getSecond());
   }
 
   /**
@@ -125,7 +125,7 @@ public abstract class ReduceDriverBase<K
    *          The val part of a (k, v) pair to add
    */
   public void addOutput(final K2 key, final V2 val) {
-    addOutput(new Pair<K2, V2>(key, val));
+    expectedOutputs.add(copyPair(key, val));
   }
 
   /**
@@ -210,7 +210,7 @@ public abstract class ReduceDriverBase<K
         @Override
         public T next() {
           final T next = iterator.next();
-          value = (T) serialization.copy(next, value);
+          value = serialization.copy(next, value);
           return value;
         }
 

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/Serialization.java Tue Apr 17 03:37:10 2012
@@ -28,8 +28,11 @@ import org.apache.hadoop.io.serializer.S
 
 public class Serialization {
 
-  private final SerializationFactory serializationFactory;
+  private SerializationFactory serializationFactory;
 
+  /**
+   * @param conf
+   */
   public Serialization(Configuration conf) {
     serializationFactory = new SerializationFactory(conf);
   }
@@ -47,7 +50,7 @@ public class Serialization {
    * @return a copy of the orig object
    */
   @SuppressWarnings("unchecked")
-  public Object copy(final Object orig, final Object copy) {
+  public <T> T copy(final T orig, final T copy) {
     if (copy != null && orig.getClass() != copy.getClass()) {
       throw new IllegalArgumentException(orig.getClass() + " != "
           + copy.getClass());
@@ -62,7 +65,8 @@ public class Serialization {
           .getDeserializer(clazz);
     } catch (NullPointerException e) {
       throw new IllegalStateException(
-          "No applicable class implementing Serialization in conf at io.serializations");
+          "No applicable class implementing Serialization in conf at io.serializations for "
+              + orig.getClass(), e);
     }
     try {
       final DataOutputBuffer outputBuffer = new DataOutputBuffer();
@@ -71,7 +75,7 @@ public class Serialization {
       final DataInputBuffer inputBuffer = new DataInputBuffer();
       inputBuffer.reset(outputBuffer.getData(), outputBuffer.getLength());
       deserializer.open(inputBuffer);
-      return deserializer.deserialize(copy);
+      return (T) deserializer.deserialize(copy);
     } catch (final IOException e) {
       throw new RuntimeException(e);
     }
@@ -83,8 +87,21 @@ public class Serialization {
    * @param orig
    * @return a new copy of the orig object
    */
-  public Object copy(final Object orig) {
+  public <T> T copy(final T orig) {
     return copy(orig, null);
   }
 
+  /**
+   * Creates a new copy of the orig object
+   * 
+   * @param orig
+   * @param conf
+   *          new Configuration object to use
+   * @return a new copy of the orig object
+   */
+  public <T> T copyWithConf(final T orig, final Configuration conf) {
+    serializationFactory = new SerializationFactory(conf);
+    return copy(orig);
+  }
+
 }

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java Tue Apr 17 03:37:10 2012
@@ -46,11 +46,14 @@ public abstract class TestDriver<K1, V1,
 
   protected CounterWrapper counterWrapper;
 
+  protected Serialization serialization;
+
   public TestDriver() {
     expectedOutputs = new ArrayList<Pair<K2, V2>>();
     expectedEnumCounters = new ArrayList<Pair<Enum, Long>>();
     expectedStringCounters = new ArrayList<Pair<Pair<String, String>, Long>>();
     configuration = new Configuration();
+    serialization = new Serialization(configuration);
   }
 
   /**
@@ -123,6 +126,23 @@ public abstract class TestDriver<K1, V1,
   }
 
   /**
+   * @return The configuration object that will given to the mapper and/or
+   *         reducer associated with the driver
+   */
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  /**
+   * @param configuration
+   *          The configuration object that will given to the mapper and/or
+   *          reducer associated with the driver
+   */
+  public void setConfiguration(final Configuration configuration) {
+    this.configuration = returnNonNull(configuration);
+  }
+
+  /**
    * Runs the test but returns the result set instead of validating it (ignores
    * any addOutput(), etc calls made before this)
    * 
@@ -189,6 +209,14 @@ public abstract class TestDriver<K1, V1,
     return outList;
   }
 
+  protected <T> T copy(T object) {
+    return serialization.copyWithConf(object, configuration);
+  }
+
+  protected <S, T> Pair<S, T> copyPair(S first, T second) {
+    return new Pair<S, T>(copy(first), copy(second));
+  }
+
   /**
    * check the outputs against the expected inputs in record
    * 
@@ -419,21 +447,4 @@ public abstract class TestDriver<K1, V1,
 
     sb.append(")");
   }
-
-  /**
-   * @return The configuration object that will given to the mapper and/or
-   *         reducer associated with the driver (new API only)
-   */
-  public Configuration getConfiguration() {
-    return configuration;
-  }
-
-  /**
-   * @param configuration
-   *          The configuration object that will given to the mapper and/or
-   *          reducer associated with the driver (new API only)
-   */
-  public void setConfiguration(final Configuration configuration) {
-    this.configuration = returnNonNull(configuration);
-  }
 }

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Tue Apr 17 03:37:10 2012
@@ -308,8 +308,9 @@ public class MapReduceDriver<K1, V1, K2,
     for (final Pair<K1, V1> input : inputList) {
       LOG.debug("Mapping input " + input.toString() + ")");
 
-      mapOutputs.addAll(MapDriver.newMapDriver(myMapper).withInput(input)
-          .withCounters(getCounters()).withConfiguration(configuration).run());
+      mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
+          .withCounters(getCounters()).withConfiguration(configuration)
+          .withInput(input).run());
     }
 
     if (myCombiner != null) {

Modified: incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java
URL: http://svn.apache.org/viewvc/incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java?rev=1326898&r1=1326897&r2=1326898&view=diff
==============================================================================
--- incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java (original)
+++ incubator/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mock/MockOutputCollector.java Tue Apr 17 03:37:10 2012
@@ -47,8 +47,8 @@ public class MockOutputCollector<K, V> i
   @Override
   @SuppressWarnings("unchecked")
   public void collect(final K key, final V value) throws IOException {
-    collectedOutputs.add(new Pair<K, V>((K) serialization.copy(key),
-        (V) serialization.copy(value)));
+    collectedOutputs.add(new Pair<K, V>(serialization.copy(key), serialization
+        .copy(value)));
   }
 
   /**