You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by br...@apache.org on 2013/01/07 21:07:47 UTC
[1/2] git commit: MRUNIT-161: some distributed cache apis not working
- JobContext getCacheArchives()
MRUNIT-161: some distributed cache apis not working - JobContext getCacheArchives()
Project: http://git-wip-us.apache.org/repos/asf/mrunit/repo
Commit: http://git-wip-us.apache.org/repos/asf/mrunit/commit/cb76b50c
Tree: http://git-wip-us.apache.org/repos/asf/mrunit/tree/cb76b50c
Diff: http://git-wip-us.apache.org/repos/asf/mrunit/diff/cb76b50c
Branch: refs/heads/trunk-hadoop1
Commit: cb76b50c2ce5257034a87ddbca72ec4185bd1e79
Parents: c697735
Author: Brock Noland <br...@apache.org>
Authored: Mon Jan 7 14:01:10 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Mon Jan 7 14:01:58 2013 -0600
----------------------------------------------------------------------
.../org/apache/hadoop/mrunit/MapReduceDriver.java | 4 +-
.../hadoop/mrunit/PipelineMapReduceDriver.java | 2 +-
.../java/org/apache/hadoop/mrunit/TestDriver.java | 45 ++++++++++++--
.../mapreduce/AbstractMockContextWrapper.java | 45 ++++++++++++++-
.../internal/mapreduce/MockMapContextWrapper.java | 7 ++-
.../mapreduce/MockReduceContextWrapper.java | 4 +-
.../apache/hadoop/mrunit/mapreduce/MapDriver.java | 20 +++++-
.../hadoop/mrunit/mapreduce/MapReduceDriver.java | 4 +-
.../hadoop/mrunit/mapreduce/ReduceDriver.java | 20 +++++-
.../mrunit/mapreduce/TestDistributedCache.java | 1 -
10 files changed, 127 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
index 3a5d069..fb56010 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
@@ -226,7 +226,7 @@ public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
.newReduceDriver(reducer).withCounters(getCounters())
- .withConfiguration(configuration).withAll(inputs);
+ .withConfiguration(getConfiguration()).withAll(inputs);
if (getOutputSerializationConfiguration() != null) {
reduceDriver
@@ -254,7 +254,7 @@ public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
// run map component
LOG.debug("Starting map phase with mapper: " + myMapper);
mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
- .withCounters(getCounters()).withConfiguration(configuration)
+ .withCounters(getCounters()).withConfiguration(getConfiguration())
.withAll(inputList).withMapInputPath(getMapInputPath()).run());
if (myCombiner != null) {
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
index 9dd7086..96b6b75 100644
--- a/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
@@ -302,7 +302,7 @@ public class PipelineMapReduceDriver<K1, V1, K2, V2> extends
job.getFirst(), job.getSecond());
mrDriver.setCounters(getCounters());
- mrDriver.setConfiguration(configuration);
+ mrDriver.setConfiguration(getConfiguration());
mrDriver.setMapInputPath(mapInputPath);
// Add the inputs from the user, or from the previous stage of the
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
index 6d0db44..15fd682 100644
--- a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
@@ -51,19 +51,28 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
private boolean strictCountersChecking = false;
protected List<Pair<Enum<?>, Long>> expectedEnumCounters;
protected List<Pair<Pair<String, String>, Long>> expectedStringCounters;
- protected Configuration configuration;
+ /**
+ * Configuration object, do not use directly, always use the
+ * the getter as it lazily creates the object in the case
+ * the setConfiguration() method will be used by the user.
+ */
+ private Configuration configuration;
+ /**
+ * Serialization object, do not use directly, always use the
+ * the getter as it lazily creates the object in the case
+ * the setConfiguration() method will be used by the user.
+ */
+ private Serialization serialization;
+
private Configuration outputSerializationConfiguration;
private File tmpDistCacheDir;
protected CounterWrapper counterWrapper;
- protected Serialization serialization;
public TestDriver() {
expectedOutputs = new ArrayList<Pair<K2, V2>>();
expectedEnumCounters = new ArrayList<Pair<Enum<?>, Long>>();
expectedStringCounters = new ArrayList<Pair<Pair<String, String>, Long>>();
- configuration = new Configuration();
- serialization = new Serialization(configuration);
}
/**
@@ -252,14 +261,23 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
* reducer associated with the driver
*/
public Configuration getConfiguration() {
+ if(configuration == null) {
+ configuration = new Configuration();
+ }
return configuration;
}
/**
* @param configuration
* The configuration object that will given to the mapper and/or
- * reducer associated with the driver
+ * reducer associated with the driver. This method should only be
+ * called directly after the constructor as the internal state
+ * of the driver depends on the configuration object
+ * @deprecated
+ * Use getConfiguration() to set configuration items as opposed to
+ * overriding the entire configuration object as it's used internally.
*/
+ @Deprecated
public void setConfiguration(final Configuration configuration) {
this.configuration = returnNonNull(configuration);
}
@@ -267,9 +285,15 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
/**
* @param configuration
* The configuration object that will given to the mapper associated
- * with the driver
+ * with the driver. This method should only be called directly after
+ * the constructor as the internal state of the driver depends on the
+ * configuration object
+ * @deprecated
+ * Use getConfiguration() to set configuration items as opposed to
+ * overriding the entire configuration object as it's used internally.
* @return this object for fluent coding
*/
+ @Deprecated
public T withConfiguration(
final Configuration configuration) {
setConfiguration(configuration);
@@ -435,6 +459,13 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
return outputs;
}
+ private Serialization getSerialization() {
+ if(serialization == null) {
+ serialization = new Serialization(getConfiguration());
+ }
+ return serialization;
+ }
+
/**
* Initialises the test distributed cache if required. This
* process is referred to as "localizing" by Hadoop, but since
@@ -574,7 +605,7 @@ public abstract class TestDriver<K1, V1, K2, V2, T extends TestDriver<K1, V1, K2
}
protected <E> E copy(E object) {
- return serialization.copyWithConf(object, configuration);
+ return getSerialization().copyWithConf(object, getConfiguration());
}
protected <S, E> Pair<S, E> copyPair(S first, E second) {
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
index 43d0b53..be8beb2 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
@@ -21,12 +21,15 @@ import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
import org.apache.hadoop.mrunit.types.Pair;
@@ -38,10 +41,13 @@ extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> {
protected CONTEXT context;
protected final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator;
+ protected final Configuration configuration;
protected OutputCollectable<KEYOUT, VALUEOUT> outputCollectable;
- public AbstractMockContextWrapper(final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator) {
+ public AbstractMockContextWrapper(final Configuration configuration,
+ final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator) {
this.mockOutputCreator = mockOutputCreator;
+ this.configuration = configuration;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -88,6 +94,43 @@ extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> {
return null;
}
}).when(context).write(any(), any());
+
+
+ when(context.getSymlink()).thenAnswer(new Answer<Boolean>() {
+ @Override
+ @SuppressWarnings("deprecation")
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ return DistributedCache.getSymlink(configuration);
+ }
+ });
+ when(context.getCacheArchives()).thenAnswer(new Answer<URI[]>() {
+ @Override
+ @SuppressWarnings("deprecation")
+ public URI[] answer(InvocationOnMock invocation) throws Throwable {
+ return DistributedCache.getCacheArchives(configuration);
+ }
+ });
+ when(context.getCacheFiles()).thenAnswer(new Answer<URI[]>() {
+ @Override
+ @SuppressWarnings("deprecation")
+ public URI[] answer(InvocationOnMock invocation) throws Throwable {
+ return DistributedCache.getCacheFiles(configuration);
+ }
+ });
+ when(context.getLocalCacheArchives()).thenAnswer(new Answer<Path[]>() {
+ @Override
+ @SuppressWarnings("deprecation")
+ public Path[] answer(InvocationOnMock invocation) throws Throwable {
+ return DistributedCache.getLocalCacheArchives(configuration);
+ }
+ });
+ when(context.getLocalCacheFiles()).thenAnswer(new Answer<Path[]>() {
+ @Override
+ @SuppressWarnings("deprecation")
+ public Path[] answer(InvocationOnMock invocation) throws Throwable {
+ return DistributedCache.getLocalCacheFiles(configuration);
+ }
+ });
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
index 0febe85..72a903e 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -55,10 +56,12 @@ public class MockMapContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
protected Pair<KEYIN, VALUEIN> currentKeyValue;
- public MockMapContextWrapper(final List<Pair<KEYIN, VALUEIN>> inputs,
+ public MockMapContextWrapper(
+ final Configuration configuration,
+ final List<Pair<KEYIN, VALUEIN>> inputs,
final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator,
final MapDriver<KEYIN, VALUEIN, KEYOUT, VALUEOUT> driver) {
- super(mockOutputCreator);
+ super(configuration, mockOutputCreator);
this.inputs = inputs;
this.driver = driver;
context = create();
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
index 4996074..0bdfd6a 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
@@ -56,10 +57,11 @@ public class MockReduceContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
protected Pair<KEYIN, List<VALUEIN>> currentKeyValue;
public MockReduceContextWrapper(
+ final Configuration configuration,
final List<Pair<KEYIN, List<VALUEIN>>> inputs,
final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator,
final ReduceDriver<KEYIN, VALUEIN, KEYOUT, VALUEOUT> driver) {
- super(mockOutputCreator);
+ super(configuration, mockOutputCreator);
this.inputs = inputs;
this.driver = driver;
context = create();
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
index 84f22c1..8f7b0a8 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
@@ -49,9 +49,12 @@ extends MapDriverBase<K1, V1, K2, V2, MapDriver<K1, V1, K2, V2> > implements Con
private Mapper<K1, V1, K2, V2> myMapper;
private Counters counters;
-
- private final MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper<K1, V1, K2, V2>(
- inputs, mockOutputCreator, this);
+ /**
+ * Context creator, do not use directly, always use the
+ * the getter as it lazily creates the object in the case
+ * the setConfiguration() method will be used by the user.
+ */
+ private MockMapContextWrapper<K1, V1, K2, V2> wrapper;
public MapDriver(final Mapper<K1, V1, K2, V2> m) {
@@ -130,6 +133,7 @@ extends MapDriverBase<K1, V1, K2, V2, MapDriver<K1, V1, K2, V2> > implements Con
try {
preRunChecks(myMapper);
initDistributedCache();
+ MockMapContextWrapper<K1, V1, K2, V2> wrapper = getContextWrapper();
myMapper.run(wrapper.getMockContext());
return wrapper.getOutputs();
} catch (final InterruptedException ie) {
@@ -144,6 +148,14 @@ extends MapDriverBase<K1, V1, K2, V2, MapDriver<K1, V1, K2, V2> > implements Con
return "MapDriver (0.20+) (" + myMapper + ")";
}
+ private MockMapContextWrapper<K1, V1, K2, V2> getContextWrapper() {
+ if(wrapper == null) {
+ wrapper = new MockMapContextWrapper<K1, V1, K2, V2>(getConfiguration(),
+ inputs, mockOutputCreator, this);
+ }
+ return wrapper;
+ }
+
/**
* <p>Obtain Context object for furthering mocking with Mockito.
* For example, causing write() to throw an exception:</p>
@@ -169,7 +181,7 @@ extends MapDriverBase<K1, V1, K2, V2, MapDriver<K1, V1, K2, V2> > implements Con
* @return the mocked context
*/
public Mapper<K1, V1, K2, V2>.Context getContext() {
- return wrapper.getMockContext();
+ return getContextWrapper().getMockContext();
}
/**
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
index cf2abcc..78941c2 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
@@ -224,7 +224,7 @@ public class MapReduceDriver<K1, V1, K2, V2, K3, V3> extends
final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
.newReduceDriver(reducer).withCounters(getCounters())
- .withConfiguration(configuration).withAll(inputs);
+ .withConfiguration(getConfiguration()).withAll(inputs);
if (getOutputSerializationConfiguration() != null) {
reduceDriver
@@ -251,7 +251,7 @@ public class MapReduceDriver<K1, V1, K2, V2, K3, V3> extends
// run map component
LOG.debug("Starting map phase with mapper: " + myMapper);
mapOutputs.addAll(MapDriver.newMapDriver(myMapper)
- .withCounters(getCounters()).withConfiguration(configuration)
+ .withCounters(getCounters()).withConfiguration(getConfiguration())
.withAll(inputList).withMapInputPath(getMapInputPath()).run());
if (myCombiner != null) {
// User has specified a combiner. Run this and replace the mapper outputs
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index 4996450..c7a15b7 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -51,9 +51,12 @@ public class ReduceDriver<K1, V1, K2, V2> extends
private Reducer<K1, V1, K2, V2> myReducer;
private Counters counters;
-
- private final MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
- inputs, mockOutputCreator, this);
+ /**
+ * Context creator, do not use directly, always use the
+ * the getter as it lazily creates the object in the case
+ * the setConfiguration() method will be used by the user.
+ */
+ private MockReduceContextWrapper<K1, V1, K2, V2> wrapper;
public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
@@ -136,6 +139,7 @@ public class ReduceDriver<K1, V1, K2, V2> extends
try {
preRunChecks(myReducer);
initDistributedCache();
+ MockReduceContextWrapper<K1, V1, K2, V2> wrapper = getContextWrapper();
myReducer.run(wrapper.getMockContext());
return wrapper.getOutputs();
} catch (final InterruptedException ie) {
@@ -150,6 +154,14 @@ public class ReduceDriver<K1, V1, K2, V2> extends
return "ReduceDriver (0.20+) (" + myReducer + ")";
}
+ private MockReduceContextWrapper<K1, V1, K2, V2> getContextWrapper() {
+ if(wrapper == null) {
+ wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
+ getConfiguration(), inputs, mockOutputCreator, this);
+ }
+ return wrapper;
+ }
+
/**
* <p>Obtain Context object for furthering mocking with Mockito.
* For example, causing write() to throw an exception:</p>
@@ -175,7 +187,7 @@ public class ReduceDriver<K1, V1, K2, V2> extends
* @return the mocked context
*/
public Reducer<K1, V1, K2, V2>.Context getContext() {
- return wrapper.getMockContext();
+ return getContextWrapper().getMockContext();
}
/**
http://git-wip-us.apache.org/repos/asf/mrunit/blob/cb76b50c/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
index 50a8934..fb5360e 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestDistributedCache.java
@@ -252,5 +252,4 @@ public class TestDistributedCache {
.withOutput(new Text("testarchive.tar/d"), new Text("file"))
.runTest(false);
}
-
}