You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by br...@apache.org on 2013/01/07 21:02:58 UTC

git commit: MRUNIT-161: some distributed cache apis not working - JobContext getCacheArchives()

Updated Branches:
  refs/heads/trunk c6977354d -> 5bc108643


MRUNIT-161: some distributed cache apis not working - JobContext getCacheArchives()


Project: http://git-wip-us.apache.org/repos/asf/mrunit/repo
Commit: http://git-wip-us.apache.org/repos/asf/mrunit/commit/5bc10864
Tree: http://git-wip-us.apache.org/repos/asf/mrunit/tree/5bc10864
Diff: http://git-wip-us.apache.org/repos/asf/mrunit/diff/5bc10864

Branch: refs/heads/trunk
Commit: 5bc1086432700b27cb1a012bd3e3449f68b1174d
Parents: c697735
Author: Brock Noland <br...@apache.org>
Authored: Mon Jan 7 14:01:10 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Mon Jan 7 14:01:10 2013 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/mrunit/MapReduceDriver.java  |    4 +-
 .../hadoop/mrunit/PipelineMapReduceDriver.java     |    2 +-
 .../java/org/apache/hadoop/mrunit/TestDriver.java  |   45 ++++++++++++--
 .../mapreduce/AbstractMockContextWrapper.java      |   45 ++++++++++++++-
 .../internal/mapreduce/MockMapContextWrapper.java  |    7 ++-
 .../mapreduce/MockReduceContextWrapper.java        |    4 +-
 .../apache/hadoop/mrunit/mapreduce/MapDriver.java  |   20 +++++-
 .../hadoop/mrunit/mapreduce/MapReduceDriver.java   |    4 +-
 .../hadoop/mrunit/mapreduce/ReduceDriver.java      |   20 +++++-
 .../mrunit/mapreduce/TestDistributedCache.java     |    1 -
 10 files changed, 127 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
index 3a5d069..fb56010 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
@@ -226,7 +226,7 @@ public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
 
         final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
             .newReduceDriver(reducer).withCounters(getCounters())
-            .withConfiguration(configuration).withAll(inputs);
+            .withConfiguration(getConfiguration()).withAll(inputs);
 
         if (getOutputSerializationConfiguration() != null) {
           reduceDriver
@@ -254,7 +254,7 @@ public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
       // run map component
       LOG.debug("Starting map phase with mapper: " + myMapper);
       mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
-          .withCounters(getCounters()).withConfiguration(configuration)
+          .withCounters(getCounters()).withConfiguration(getConfiguration())
           .withAll(inputList).withMapInputPath(getMapInputPath()).run());
 
       if (myCombiner != null) {

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
index 9dd7086..96b6b75 100644
--- a/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
@@ -302,7 +302,7 @@ public class PipelineMapReduceDriver<K1, V1, K2, V2> extends
           job.getFirst(), job.getSecond());
 
       mrDriver.setCounters(getCounters());
-      mrDriver.setConfiguration(configuration);
+      mrDriver.setConfiguration(getConfiguration());
       mrDriver.setMapInputPath(mapInputPath);
 
       // Add the inputs from the user, or from the previous stage of the

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
index 6d0db44..15fd682 100644
--- a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
@@ -51,19 +51,28 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
   private boolean strictCountersChecking = false;
   protected List<Pair<Enum<?>, Long>> expectedEnumCounters;
   protected List<Pair<Pair<String, String>, Long>> expectedStringCounters;
-  protected Configuration configuration;
+  /**
+   * Configuration object, do not use directly, always use the 
+   * the getter as it lazily creates the object in the case
+   * the setConfiguration() method will be used by the user.
+   */
+  private Configuration configuration;
+  /**
+   * Serialization object, do not use directly, always use the 
+   * the getter as it lazily creates the object in the case
+   * the setConfiguration() method will be used by the user.
+   */
+  private Serialization serialization;
+  
   private Configuration outputSerializationConfiguration;
   private File tmpDistCacheDir;
   protected CounterWrapper counterWrapper;
 
-  protected Serialization serialization;
 
   public TestDriver() {
     expectedOutputs = new ArrayList<Pair<K2, V2>>();
     expectedEnumCounters = new ArrayList<Pair<Enum<?>, Long>>();
     expectedStringCounters = new ArrayList<Pair<Pair<String, String>, Long>>();
-    configuration = new Configuration();
-    serialization = new Serialization(configuration);
   }
 
   /**
@@ -252,14 +261,23 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
    *         reducer associated with the driver
    */
   public Configuration getConfiguration() {
+    if(configuration == null) {
+      configuration = new Configuration();
+    }
     return configuration;
   }
 
   /**
    * @param configuration
    *          The configuration object that will given to the mapper and/or
-   *          reducer associated with the driver
+   *          reducer associated with the driver. This method should only be
+   *          called directly after the constructor as the internal state
+   *          of the driver depends on the configuration object
+   * @deprecated
+   *          Use getConfiguration() to set configuration items as opposed to
+   *          overriding the entire configuration object as it's used internally.
    */
+  @Deprecated
   public void setConfiguration(final Configuration configuration) {
     this.configuration = returnNonNull(configuration);
   }
@@ -267,9 +285,15 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
   /**
    * @param configuration
    *          The configuration object that will given to the mapper associated
-   *          with the driver
+   *          with the driver. This method should only be called directly after 
+   *          the constructor as the internal state of the driver depends on the 
+   *          configuration object
+   * @deprecated
+   *          Use getConfiguration() to set configuration items as opposed to
+   *          overriding the entire configuration object as it's used internally.
    * @return this object for fluent coding
    */
+  @Deprecated
   public T withConfiguration(
       final Configuration configuration) {
     setConfiguration(configuration);
@@ -435,6 +459,13 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
     return outputs;
   }
 
+  private Serialization getSerialization() {
+    if(serialization == null) {
+      serialization = new Serialization(getConfiguration());
+    }
+    return serialization;
+  }
+  
   /**
    * Initialises the test distributed cache if required. This
    * process is referred to as "localizing" by Hadoop, but since
@@ -574,7 +605,7 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
   }
 
   protected <E> E copy(E object) {
-    return serialization.copyWithConf(object, configuration);
+    return getSerialization().copyWithConf(object, getConfiguration());
   }
 
   protected <S, E> Pair<S, E> copyPair(S first, E second) {

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
index 43d0b53..be8beb2 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
@@ -21,12 +21,15 @@ import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
 import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -38,10 +41,13 @@ extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> {
 
   protected CONTEXT context;
   protected final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator;
+  protected final Configuration configuration;
   protected OutputCollectable<KEYOUT, VALUEOUT> outputCollectable;
 
-  public AbstractMockContextWrapper(final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator) {
+  public AbstractMockContextWrapper(final Configuration configuration, 
+      final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator) {
     this.mockOutputCreator = mockOutputCreator;
+    this.configuration = configuration;
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -88,6 +94,43 @@ extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> {
           return null;
         }
       }).when(context).write(any(), any());
+      
+      
+      when(context.getSymlink()).thenAnswer(new Answer<Boolean>() {
+        @Override
+        @SuppressWarnings("deprecation")
+        public Boolean answer(InvocationOnMock invocation) throws Throwable {
+          return DistributedCache.getSymlink(configuration);
+        }      
+      });
+      when(context.getCacheArchives()).thenAnswer(new Answer<URI[]>() {
+        @Override
+        @SuppressWarnings("deprecation")
+        public URI[] answer(InvocationOnMock invocation) throws Throwable {
+          return DistributedCache.getCacheArchives(configuration);
+        }      
+      });
+      when(context.getCacheFiles()).thenAnswer(new Answer<URI[]>() {
+        @Override
+        @SuppressWarnings("deprecation")
+        public URI[] answer(InvocationOnMock invocation) throws Throwable {
+          return DistributedCache.getCacheFiles(configuration);
+        }      
+      });
+      when(context.getLocalCacheArchives()).thenAnswer(new Answer<Path[]>() {
+        @Override
+        @SuppressWarnings("deprecation")
+        public Path[] answer(InvocationOnMock invocation) throws Throwable {
+          return DistributedCache.getLocalCacheArchives(configuration);
+        }      
+      });
+      when(context.getLocalCacheFiles()).thenAnswer(new Answer<Path[]>() {
+        @Override
+        @SuppressWarnings("deprecation")
+        public Path[] answer(InvocationOnMock invocation) throws Throwable {
+          return DistributedCache.getLocalCacheFiles(configuration);
+        }      
+      });
     } catch (IOException e) {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
index 0febe85..72a903e 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -55,10 +56,12 @@ public class MockMapContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
   
   protected Pair<KEYIN, VALUEIN> currentKeyValue;
   
-  public MockMapContextWrapper(final List<Pair<KEYIN, VALUEIN>> inputs,
+  public MockMapContextWrapper(
+      final Configuration configuration,
+      final List<Pair<KEYIN, VALUEIN>> inputs,
       final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator,
       final MapDriver<KEYIN, VALUEIN, KEYOUT, VALUEOUT> driver) {
-    super(mockOutputCreator);
+    super(configuration, mockOutputCreator);
     this.inputs = inputs;
     this.driver = driver;
     context = create();

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
index 4996074..0bdfd6a 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
@@ -56,10 +57,11 @@ public class MockReduceContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
   protected Pair<KEYIN, List<VALUEIN>> currentKeyValue;
   
   public MockReduceContextWrapper(
+      final Configuration configuration,
       final List<Pair<KEYIN, List<VALUEIN>>> inputs, 
       final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator,
       final ReduceDriver<KEYIN, VALUEIN, KEYOUT, VALUEOUT> driver) {
-    super(mockOutputCreator);
+    super(configuration, mockOutputCreator);
     this.inputs = inputs;
     this.driver = driver;
     context = create();

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
index 84f22c1..8f7b0a8 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
@@ -49,9 +49,12 @@ extends MapDriverBase<K1, V1, K2, V2, MapDriver<K1, V1, K2, V2> > implements Con
 
   private Mapper<K1, V1, K2, V2> myMapper;
   private Counters counters;
-
-  private final MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper<K1, V1, K2, V2>(
-      inputs, mockOutputCreator,  this);
+  /**
+   * Context creator, do not use directly, always use the 
+   * the getter as it lazily creates the object in the case
+   * the setConfiguration() method will be used by the user.
+   */
+  private MockMapContextWrapper<K1, V1, K2, V2> wrapper;
 
 
   public MapDriver(final Mapper<K1, V1, K2, V2> m) {
@@ -130,6 +133,7 @@ extends MapDriverBase<K1, V1, K2, V2, MapDriver<K1, V1, K2, V2> > implements Con
     try {
       preRunChecks(myMapper);
       initDistributedCache();
+      MockMapContextWrapper<K1, V1, K2, V2> wrapper = getContextWrapper();
       myMapper.run(wrapper.getMockContext());
       return wrapper.getOutputs();
     } catch (final InterruptedException ie) {
@@ -144,6 +148,14 @@ extends MapDriverBase<K1, V1, K2, V2, MapDriver<K1, V1, K2, V2> > implements Con
     return "MapDriver (0.20+) (" + myMapper + ")";
   }
   
+  private MockMapContextWrapper<K1, V1, K2, V2> getContextWrapper() {
+    if(wrapper == null) {
+      wrapper = new MockMapContextWrapper<K1, V1, K2, V2>(getConfiguration(),
+          inputs, mockOutputCreator,  this);
+    }
+    return wrapper;
+  }
+  
   /**
    * <p>Obtain Context object for furthering mocking with Mockito.
    * For example, causing write() to throw an exception:</p>
@@ -169,7 +181,7 @@ extends MapDriverBase<K1, V1, K2, V2, MapDriver<K1, V1, K2, V2> > implements Con
    * @return the mocked context
    */
   public Mapper<K1, V1, K2, V2>.Context getContext() {
-    return wrapper.getMockContext();
+    return getContextWrapper().getMockContext();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
index cf2abcc..78941c2 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
@@ -224,7 +224,7 @@ public class MapReduceDriver<K1, V1, K2, V2, K3, V3> extends
 
         final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
             .newReduceDriver(reducer).withCounters(getCounters())
-            .withConfiguration(configuration).withAll(inputs);
+            .withConfiguration(getConfiguration()).withAll(inputs);
 
         if (getOutputSerializationConfiguration() != null) {
           reduceDriver
@@ -251,7 +251,7 @@ public class MapReduceDriver<K1, V1, K2, V2, K3, V3> extends
       // run map component
       LOG.debug("Starting map phase with mapper: " + myMapper);
       mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
-          .withCounters(getCounters()).withConfiguration(configuration)
+          .withCounters(getCounters()).withConfiguration(getConfiguration())
           .withAll(inputList).withMapInputPath(getMapInputPath()).run());
       if (myCombiner != null) {
         // User has specified a combiner. Run this and replace the mapper outputs

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index 4996450..c7a15b7 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -51,9 +51,12 @@ public class ReduceDriver<K1, V1, K2, V2> extends
 
   private Reducer<K1, V1, K2, V2> myReducer;
   private Counters counters;
-
-  private final MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
-      inputs, mockOutputCreator, this);
+  /**
+   * Context creator, do not use directly, always use the 
+   * the getter as it lazily creates the object in the case
+   * the setConfiguration() method will be used by the user.
+   */
+  private MockReduceContextWrapper<K1, V1, K2, V2> wrapper;
 
 
   public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
@@ -136,6 +139,7 @@ public class ReduceDriver<K1, V1, K2, V2> extends
     try {
       preRunChecks(myReducer);
       initDistributedCache();
+      MockReduceContextWrapper<K1, V1, K2, V2> wrapper = getContextWrapper();
       myReducer.run(wrapper.getMockContext());
       return wrapper.getOutputs();
     } catch (final InterruptedException ie) {
@@ -150,6 +154,14 @@ public class ReduceDriver<K1, V1, K2, V2> extends
     return "ReduceDriver (0.20+) (" + myReducer + ")";
   }
 
+  private MockReduceContextWrapper<K1, V1, K2, V2> getContextWrapper() {
+    if(wrapper == null) {
+      wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
+          getConfiguration(), inputs, mockOutputCreator, this);
+    }
+    return wrapper;
+  }
+  
   /**
    * <p>Obtain Context object for furthering mocking with Mockito.
    * For example, causing write() to throw an exception:</p>
@@ -175,7 +187,7 @@ public class ReduceDriver<K1, V1, K2, V2> extends
    * @return the mocked context
    */
   public Reducer<K1, V1, K2, V2>.Context getContext() {
-    return wrapper.getMockContext();
+    return getContextWrapper().getMockContext();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5bc10864/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
index 50a8934..fb5360e 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
@@ -252,5 +252,4 @@ public class TestDistributedCache {
       .withOutput(new Text("testarchive.tar/d"), new Text("file"))
       .runTest(false);
   }
-
 }