You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by br...@apache.org on 2013/03/01 14:21:05 UTC
git commit: MRUNIT-167: context passed to an OuputFormatter is
ignored by the framework
Updated Branches:
refs/heads/trunk 0c6460834 -> 0d261d629
MRUNIT-167: context passed to an OuputFormatter is ignored by the framework
Project: http://git-wip-us.apache.org/repos/asf/mrunit/repo
Commit: http://git-wip-us.apache.org/repos/asf/mrunit/commit/0d261d62
Tree: http://git-wip-us.apache.org/repos/asf/mrunit/tree/0d261d62
Diff: http://git-wip-us.apache.org/repos/asf/mrunit/diff/0d261d62
Branch: refs/heads/trunk
Commit: 0d261d6298ed2377a85fdeb60d574a62c204b4b7
Parents: 0c64608
Author: Brock Noland <br...@apache.org>
Authored: Fri Mar 1 07:21:01 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Fri Mar 1 07:21:01 2013 -0600
----------------------------------------------------------------------
.../java/org/apache/hadoop/mrunit/MapDriver.java | 2 +-
.../org/apache/hadoop/mrunit/ReduceDriver.java | 2 +-
.../mapreduce/AbstractMockContextWrapper.java | 4 +-
.../mapreduce/MockMapreduceOutputFormat.java | 94 ++++-----------
.../mrunit/internal/output/MockOutputCreator.java | 26 +++-
.../hadoop/mrunit/mapreduce/TestMapDriver.java | 42 +++++++
6 files changed, 88 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mrunit/blob/0d261d62/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
index b02b5fc..7526fbd 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
@@ -128,7 +128,7 @@ public class MapDriver<K1, V1, K2, V2> extends MapDriverBase<K1, V1, K2, V2, Map
preRunChecks(myMapper);
initDistributedCache();
final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
- .createOutputCollectable(getConfiguration(),
+ .createMapredOutputCollectable(getConfiguration(),
getOutputSerializationConfiguration());
final MockReporter reporter = new MockReporter(
MockReporter.ReporterType.Mapper, getCounters(),
http://git-wip-us.apache.org/repos/asf/mrunit/blob/0d261d62/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
index 20d2de4..8ef1a3b 100644
--- a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
@@ -131,7 +131,7 @@ public class ReduceDriver<K1, V1, K2, V2> extends
preRunChecks(myReducer);
initDistributedCache();
final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
- .createOutputCollectable(getConfiguration(),
+ .createMapredOutputCollectable(getConfiguration(),
getOutputSerializationConfiguration());
final MockReporter reporter = new MockReporter(
MockReporter.ReporterType.Reducer, getCounters());
http://git-wip-us.apache.org/repos/asf/mrunit/blob/0d261d62/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
index be8beb2..f0206a9 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
@@ -84,8 +84,8 @@ extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> {
final Object[] args = invocation.getArguments();
try {
if(outputCollectable == null) {
- outputCollectable = mockOutputCreator.createOutputCollectable(contextDriver.getConfiguration(),
- contextDriver.getOutputSerializationConfiguration());
+ outputCollectable = mockOutputCreator.createMapReduceOutputCollectable(contextDriver.getConfiguration(),
+ contextDriver.getOutputSerializationConfiguration(), context);
}
outputCollectable.collect((KEYOUT)args[0], (VALUEOUT)args[1]);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/mrunit/blob/0d261d62/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
index 831253a..47a3dfd 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
@@ -17,21 +17,18 @@
*/
package org.apache.hadoop.mrunit.internal.mapreduce;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -48,13 +45,7 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
private static String ATTEMPT = "attempt_000000000000_0000_m_000000_0";
private static TaskAttemptID TASK_ID = TaskAttemptID.forName(ATTEMPT);
- private static final Class<?>[] TASK_ATTEMPT_CONTEXT_CLASSES = new Class<?>[] {
- Configuration.class, TaskAttemptID.class };
- private static final Class<?>[] JOB_CONTEXT_CLASSES = new Class<?>[] {
- Configuration.class, JobID.class };
- private final Job outputFormatJob;
- private final Job inputFormatJob;
private final File outputPath = new File(
System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
private TaskAttemptContext taskAttemptContext;
@@ -69,11 +60,10 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
@SuppressWarnings("rawtypes")
public MockMapreduceOutputFormat(Job outputFormatJob,
Class<? extends OutputFormat> outputFormatClass,
- Class<? extends InputFormat> inputFormatClass, Job inputFormatJob)
+ Class<? extends InputFormat> inputFormatClass, Job inputFormatJob,
+ TaskAttemptContext taskAttemptContext)
throws IOException {
- this.outputFormatJob = outputFormatJob;
- this.inputFormatJob = inputFormatJob;
-
+ this.taskAttemptContext = taskAttemptContext;
outputFormat = ReflectionUtils.newInstance(outputFormatClass,
outputFormatJob.getConfiguration());
inputFormat = ReflectionUtils.newInstance(inputFormatClass,
@@ -86,66 +76,34 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
if (!outputPath.mkdir()) {
throw new IOException("Failed to create output dir " + outputPath);
}
- FileOutputFormat.setOutputPath(outputFormatJob,
- new Path(outputPath.toString()));
- }
-
- private void setClassIfUnset(String name, Class<?> classType) {
- outputFormatJob.getConfiguration().setIfUnset(name, classType.getName());
- }
-
- private Object createObject(String primaryClassName,
- String secondaryClassName, Class<?>[] constructorParametersClasses,
- Object... constructorParameters) {
- try {
- Class<?> classType = Class.forName(primaryClassName);
- try {
- Constructor<?> constructor = classType
- .getConstructor(constructorParametersClasses);
- return constructor.newInstance(constructorParameters);
- } catch (SecurityException e) {
- throw new IllegalStateException(e);
- } catch (NoSuchMethodException e) {
- throw new IllegalStateException(e);
- } catch (IllegalArgumentException e) {
- throw new IllegalStateException(e);
- } catch (InstantiationException e) {
- throw new IllegalStateException(e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException(e);
- } catch (InvocationTargetException e) {
- throw new IllegalStateException(e);
- }
- } catch (ClassNotFoundException e) {
- if (secondaryClassName == null) {
- throw new IllegalStateException(e);
- }
- return createObject(secondaryClassName, null,
- constructorParametersClasses, constructorParameters);
- }
+ taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR,
+ new Path(outputPath.toString()).toString());
+ taskAttemptContext.getConfiguration().set(FileInputFormat.INPUT_DIR,
+ new Path((outputPath + "/*/*/*/*")).toString());
}
-
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void collect(K key, V value) throws IOException {
try {
if (recordWriter == null) {
- setClassIfUnset("mapred.output.key.class", key.getClass());
- setClassIfUnset("mapred.output.value.class", value.getClass());
-
- taskAttemptContext = (TaskAttemptContext) createObject(
- "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
- "org.apache.hadoop.mapreduce.TaskAttemptContext",
- TASK_ATTEMPT_CONTEXT_CLASSES, outputFormatJob.getConfiguration(),
- TASK_ID);
+ if(taskAttemptContext.getOutputKeyClass() == null) {
+ when(taskAttemptContext.getOutputKeyClass()).thenReturn((Class)key.getClass());
+ }
+ if(taskAttemptContext.getOutputValueClass() == null) {
+ when(taskAttemptContext.getOutputValueClass()).thenReturn((Class)value.getClass());
+ }
+ if(taskAttemptContext.getTaskAttemptID() == null) {
+ when(taskAttemptContext.getTaskAttemptID()).thenReturn(TASK_ID);
+ }
recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
}
-
recordWriter.write(key, value);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
+ @SuppressWarnings({ "unchecked"})
@Override
public List<Pair<K, V>> getOutputs() throws IOException {
try {
@@ -155,14 +113,9 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
}
final Serialization serialization = new Serialization(
- inputFormatJob.getConfiguration());
- FileInputFormat.setInputPaths(inputFormatJob, outputPath + "/*/*/*/*");
+ taskAttemptContext.getConfiguration());
try {
- List<InputSplit> inputSplits = inputFormat
- .getSplits((JobContext) createObject(
- "org.apache.hadoop.mapreduce.task.JobContextImpl",
- "org.apache.hadoop.mapreduce.JobContext", JOB_CONTEXT_CLASSES,
- inputFormatJob.getConfiguration(), new JobID()));
+ List<InputSplit> inputSplits = inputFormat.getSplits(taskAttemptContext);
for (InputSplit inputSplit : inputSplits) {
RecordReader<K, V> recordReader = inputFormat.createRecordReader(
inputSplit, taskAttemptContext);
@@ -179,5 +132,4 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
FileUtil.fullyDelete(outputPath);
return outputs;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mrunit/blob/0d261d62/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
index e9fb98a..1ffb9c6 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mrunit.internal.mapred.MockMapredOutputFormat;
import org.apache.hadoop.mrunit.internal.mapreduce.MockMapreduceOutputFormat;
@@ -51,19 +52,30 @@ public class MockOutputCreator<K, V> {
mapreduceInputFormatClass = returnNonNull(inputFormatClass);
}
- public OutputCollectable<K, V> createOutputCollectable(
+ @SuppressWarnings("deprecation")
+ public OutputCollectable<K, V> createMapReduceOutputCollectable(
Configuration configuration,
- Configuration outputCopyingOrInputFormatConfiguration) throws IOException {
+ Configuration outputCopyingOrInputFormatConfiguration,
+ TaskInputOutputContext context) throws IOException {
outputCopyingOrInputFormatConfiguration = outputCopyingOrInputFormatConfiguration == null ? configuration
: outputCopyingOrInputFormatConfiguration;
- if (mapredOutputFormatClass != null) {
- return new MockMapredOutputFormat<K, V>(new JobConf(configuration),
- mapredOutputFormatClass, mapredInputFormatClass, new JobConf(
- outputCopyingOrInputFormatConfiguration));
- }
if (mapreduceOutputFormatClass != null) {
return new MockMapreduceOutputFormat<K, V>(new Job(configuration),
mapreduceOutputFormatClass, mapreduceInputFormatClass, new Job(
+ outputCopyingOrInputFormatConfiguration),
+ context);
+ }
+ return new MockOutputCollector<K, V>(
+ outputCopyingOrInputFormatConfiguration);
+ }
+ public OutputCollectable<K, V> createMapredOutputCollectable(
+ Configuration configuration,
+ Configuration outputCopyingOrInputFormatConfiguration) throws IOException {
+ outputCopyingOrInputFormatConfiguration = outputCopyingOrInputFormatConfiguration == null ? configuration
+ : outputCopyingOrInputFormatConfiguration;
+ if (mapredOutputFormatClass != null) {
+ return new MockMapredOutputFormat<K, V>(new JobConf(configuration),
+ mapredOutputFormatClass, mapredInputFormatClass, new JobConf(
outputCopyingOrInputFormatConfiguration));
}
return new MockOutputCollector<K, V>(
http://git-wip-us.apache.org/repos/asf/mrunit/blob/0d261d62/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
index ac4f1d7..20acd18 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -34,10 +35,14 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mrunit.ExpectedSuppliedException;
@@ -366,6 +371,43 @@ public class TestMapDriver {
driver.runTest();
}
+ // tests to ensure counters can be incremented in record writer
+ private static enum RecordCounter {
+ NUM_RECORDS
+ };
+ public static class CountingOutputFormat extends FileOutputFormat<Text, Text> {
+ private final TextOutputFormat<Text, Text> delegate;
+ public CountingOutputFormat() {
+ delegate = new TextOutputFormat<Text, Text>();
+ }
+ @Override
+ public RecordWriter<Text, Text> getRecordWriter(final TaskAttemptContext job)
+ throws IOException, InterruptedException {
+ final RecordWriter<Text, Text> writer = delegate.getRecordWriter(job);
+ return new RecordWriter<Text, Text>() {
+ @Override
+ public void write(Text key, Text value) throws IOException,
+ InterruptedException {
+ job.getCounter(RecordCounter.NUM_RECORDS).increment(1);
+ writer.write(key, value);
+ }
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ writer.close(context);
+ }
+ };
+ }
+ }
+ @Test
+ public void testCountingOutputFormat() throws IOException {
+ driver.withOutputFormat(CountingOutputFormat.class,
+ KeyValueTextInputFormat.class);
+ driver.withInput(new Text("a"), new Text("1"));
+ driver.withOutput(new Text("a"), new Text("1"));
+ driver.withCounter(RecordCounter.NUM_RECORDS, 1);
+ driver.runTest();
+ }
@Test
public void testOutputFormat() throws IOException {
driver.withOutputFormat(SequenceFileOutputFormat.class,