You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by br...@apache.org on 2012/08/09 21:13:38 UTC
svn commit: r1371388 - in /mrunit/trunk/src:
main/java/org/apache/hadoop/mrunit/
main/java/org/apache/hadoop/mrunit/internal/mapred/
main/java/org/apache/hadoop/mrunit/mapreduce/
test/java/org/apache/hadoop/mrunit/
Author: brock
Date: Thu Aug 9 19:13:38 2012
New Revision: 1371388
URL: http://svn.apache.org/viewvc?rev=1371388&view=rev
Log:
MRUNIT-133: MapInput stuff should be moved from TestDriver to MapDriver
Added:
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java
Modified:
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java Thu Aug 9 19:13:38 2012
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.mrunit;
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
import java.io.IOException;
import java.util.List;
@@ -82,7 +82,7 @@ public class MapDriver<K1, V1, K2, V2> e
setCounters(ctrs);
return this;
}
-
+
/**
* Set the Mapper instance to use with this test driver
*
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java Thu Aug 9 19:13:38 2012
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
import org.apache.hadoop.mrunit.types.Pair;
@@ -41,7 +42,9 @@ public abstract class MapDriverBase<K1,
public static final Log LOG = LogFactory.getLog(MapDriverBase.class);
protected List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
-
+
+ protected Path mapInputPath = new Path("somefile");
+
@Deprecated
protected K1 inputKey;
@Deprecated
@@ -97,6 +100,7 @@ public abstract class MapDriverBase<K1,
* @param inputRecord
* a (key, val) pair
*/
+ @SuppressWarnings("deprecation")
public void setInput(final Pair<K1, V1> inputRecord) {
setInputKey(inputRecord.getFirst());
setInputValue(inputRecord.getSecond());
@@ -340,8 +344,33 @@ public abstract class MapDriverBase<K1,
}
/**
+ * @return the path passed to the mapper InputSplit
+ */
+ public Path getMapInputPath() {
+ return mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath Path which is to be passed to the mappers InputSplit
+ */
+ public void setMapInputPath(Path mapInputPath) {
+ this.mapInputPath = mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath
+ * The Path object which will be given to the mapper
+ * @return
+ */
+ public final T withMapInputPath(Path mapInputPath) {
+ setMapInputPath(mapInputPath);
+ return thisAsTestDriver();
+ }
+
+ /**
* Handle inputKey and inputVal for backwards compatibility.
*/
+ @SuppressWarnings("deprecation")
protected void preRunChecks(Object mapper) {
if (inputKey != null && inputVal != null) {
setInput(inputKey, inputVal);
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Thu Aug 9 19:13:38 2012
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -51,6 +52,8 @@ public abstract class MapReduceDriverBas
public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
protected List<Pair<K1, V1>> inputList = new ArrayList<Pair<K1, V1>>();
+
+ protected Path mapInputPath = new Path("somefile");
/** Key group comparator */
protected Comparator<K2> keyGroupComparator;
@@ -270,6 +273,30 @@ public abstract class MapReduceDriverBas
addAllOutput(outputRecords);
return thisAsMapReduceDriver();
}
+
+ /**
+ * @return the path passed to the mapper InputSplit
+ */
+ public Path getMapInputPath() {
+ return mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath Path which is to be passed to the mappers InputSplit
+ */
+ public void setMapInputPath(Path mapInputPath) {
+ this.mapInputPath = mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath
+ * The Path object which will be given to the mapper
+ * @return
+ */
+ public final T withMapInputPath(Path mapInputPath) {
+ setMapInputPath(mapInputPath);
+ return thisAsTestDriver();
+ }
protected void preRunChecks(Object mapper, Object reducer) {
if (inputList.isEmpty()) {
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java Thu Aug 9 19:13:38 2012
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
@@ -61,6 +62,8 @@ public class PipelineMapReduceDriver<K1,
private List<Pair<Mapper, Reducer>> mapReducePipeline;
private final List<Pair<K1, V1>> inputList;
private Counters counters;
+
+ protected Path mapInputPath = new Path("somefile");
public PipelineMapReduceDriver(final List<Pair<Mapper, Reducer>> pipeline) {
this();
@@ -308,6 +311,30 @@ public class PipelineMapReduceDriver<K1,
addOutputFromString(output);
return this;
}
+
+ /**
+ * @return the path passed to the mapper InputSplit
+ */
+ public Path getMapInputPath() {
+ return mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath Path which is to be passed to the mappers InputSplit
+ */
+ public void setMapInputPath(Path mapInputPath) {
+ this.mapInputPath = mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath
+ * The Path object which will be given to the mapper
+ * @return
+ */
+ public final PipelineMapReduceDriver<K1, V1, K2, V2> withMapInputPath(Path mapInputPath) {
+ setMapInputPath(mapInputPath);
+ return this;
+ }
@Override
@SuppressWarnings("unchecked")
@@ -329,6 +356,7 @@ public class PipelineMapReduceDriver<K1,
mrDriver.setCounters(getCounters());
mrDriver.setConfiguration(configuration);
+ mrDriver.setMapInputPath(mapInputPath);
// Add the inputs from the user, or from the previous stage of the
// pipeline.
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java Thu Aug 9 19:13:38 2012
@@ -126,8 +126,7 @@ public class ReduceDriver<K1, V1, K2, V2
.createOutputCollectable(getConfiguration(),
getOutputCopyingOrInputFormatConfiguration());
final MockReporter reporter = new MockReporter(
- MockReporter.ReporterType.Reducer, getCounters(),
- getMapInputPath());
+ MockReporter.ReporterType.Reducer, getCounters());
ReflectionUtils.setConf(myReducer, new JobConf(getConfiguration()));
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java Thu Aug 9 19:13:38 2012
@@ -56,8 +56,6 @@ public abstract class TestDriver<K1, V1,
protected CounterWrapper counterWrapper;
protected Serialization serialization;
-
- protected Path mapInputPath = new Path("somefile");
public TestDriver() {
expectedOutputs = new ArrayList<Pair<K2, V2>>();
@@ -104,7 +102,7 @@ public abstract class TestDriver<K1, V1,
}
@SuppressWarnings("unchecked")
- private T thisAsTestDriver() {
+ protected T thisAsTestDriver() {
return (T) this;
}
@@ -193,30 +191,6 @@ public abstract class TestDriver<K1, V1,
}
/**
- * @return the path passed to the mapper InputSplit
- */
- public Path getMapInputPath() {
- return mapInputPath;
- }
-
- /**
- * @param mapInputPath Path which is to be passed to the mappers InputSplit
- */
- public void setMapInputPath(Path mapInputPath) {
- this.mapInputPath = mapInputPath;
- }
-
- /**
- * @param mapInputPath
- * The Path object which will be given to the mapper
- * @return
- */
- public final T withMapInputPath(Path mapInputPath) {
- setMapInputPath(mapInputPath);
- return thisAsTestDriver();
- }
-
- /**
* Adds a file to be put on the distributed cache.
* The path may be relative and will try to be resolved from
* the classpath of the test.
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java Thu Aug 9 19:13:38 2012
@@ -34,11 +34,19 @@ public class MockReporter implements Rep
private final ReporterType typ;
+ public MockReporter(final ReporterType kind, final Counters ctrs) {
+ this(kind, ctrs, null);
+ }
+
public MockReporter(final ReporterType kind, final Counters ctrs,
final Path mapInputPath) {
typ = kind;
counters = ctrs;
- inputSplit = new MockInputSplit(mapInputPath);
+ if(mapInputPath == null) {
+ inputSplit = null;
+ } else {
+ inputSplit = new MockInputSplit(mapInputPath);
+ }
}
@Override
Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java Thu Aug 9 19:13:38 2012
@@ -18,14 +18,13 @@
package org.apache.hadoop.mrunit.mapreduce;
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
Added: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java?rev=1371388&view=auto
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java (added)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java Thu Aug 9 19:13:38 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+class InputPathStoringMapper<VALUEIN, VALUEOUT> extends MapReduceBase implements
+ Mapper<Text, VALUEIN, Text, VALUEOUT> {
+ private Path mapInputPath;
+
+ @Override
+ public void map(Text key, VALUEIN value, OutputCollector<Text, VALUEOUT> output,
+ Reporter reporter) throws IOException {
+ if (reporter.getInputSplit() instanceof FileSplit) {
+ mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
+ }
+ }
+
+ Path getMapInputPath() {
+ return mapInputPath;
+ }
+}
\ No newline at end of file
Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java Thu Aug 9 19:13:38 2012
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
@@ -543,26 +542,9 @@ public class TestMapDriver {
driver.runTest();
}
- private static class InputPathStoringMapper extends MapReduceBase implements
- Mapper<Text, Text, Text, Text> {
- private Path mapInputPath;
-
- @Override
- public void map(Text key, Text value, OutputCollector<Text, Text> output,
- Reporter reporter) throws IOException {
- if (reporter.getInputSplit() instanceof FileSplit) {
- mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
- }
- }
-
- private Path getMapInputPath() {
- return mapInputPath;
- }
- }
-
@Test
public void testMapInputFile() throws IOException {
- InputPathStoringMapper mapper = new InputPathStoringMapper();
+ InputPathStoringMapper<Text, Text> mapper = new InputPathStoringMapper<Text, Text>();
Path mapInputPath = new Path("myfile");
driver = MapDriver.newMapDriver(mapper);
driver.setMapInputPath(mapInputPath);
Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java Thu Aug 9 19:13:38 2012
@@ -566,27 +566,10 @@ public class TestMapReduceDriver {
driver.runTest();
}
- private static class InputPathStoringMapper extends MapReduceBase implements
- Mapper<Text, LongWritable, Text, LongWritable> {
- private Path mapInputPath;
-
- @Override
- public void map(Text key, LongWritable value,
- OutputCollector<Text, LongWritable> output, Reporter reporter)
- throws IOException {
- if (reporter.getInputSplit() instanceof FileSplit) {
- mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
- }
- }
-
- private Path getMapInputPath() {
- return mapInputPath;
- }
- }
-
@Test
public void testMapInputFile() throws IOException {
- InputPathStoringMapper mapper = new InputPathStoringMapper();
+ InputPathStoringMapper<LongWritable,LongWritable> mapper =
+ new InputPathStoringMapper<LongWritable,LongWritable>();
Path mapInputPath = new Path("myfile");
driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
driver.setMapInputPath(mapInputPath);
Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java Thu Aug 9 19:13:38 2012
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.mrunit;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -306,4 +309,20 @@ public class TestPipelineMapReduceDriver
driver.addOutput(key, value);
driver.runTest();
}
+
+ @Test
+ public void testMapInputFile() throws IOException {
+ InputPathStoringMapper<LongWritable,LongWritable> mapper =
+ new InputPathStoringMapper<LongWritable,LongWritable>();
+ Path mapInputPath = new Path("myfile");
+ final PipelineMapReduceDriver<Text, LongWritable, Text, LongWritable> driver = PipelineMapReduceDriver
+ .newPipelineMapReduceDriver();
+ driver.addMapReduce(mapper, new IdentityReducer<Text, LongWritable>());
+ driver.setMapInputPath(mapInputPath);
+ assertEquals(mapInputPath.getName(), driver.getMapInputPath().getName());
+ driver.withInput(new Text("a"), new LongWritable(1));
+ driver.runTest();
+ assertNotNull(mapper.getMapInputPath());
+ assertEquals(mapInputPath.getName(), mapper.getMapInputPath().getName());
+ }
}