You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by br...@apache.org on 2013/03/01 18:39:23 UTC

[1/2] git commit: MRUNIT-167: context passed to an OuputFormatter is ignored by the framework

MRUNIT-167: context passed to an OuputFormatter is ignored by the framework


Project: http://git-wip-us.apache.org/repos/asf/mrunit/repo
Commit: http://git-wip-us.apache.org/repos/asf/mrunit/commit/5d6edb33
Tree: http://git-wip-us.apache.org/repos/asf/mrunit/tree/5d6edb33
Diff: http://git-wip-us.apache.org/repos/asf/mrunit/diff/5d6edb33

Branch: refs/heads/trunk-hadoop1
Commit: 5d6edb33e8681012ca840c86f7d4a36fc548cf48
Parents: 35676de
Author: Brock Noland <br...@apache.org>
Authored: Fri Mar 1 07:21:01 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Fri Mar 1 11:30:27 2013 -0600

----------------------------------------------------------------------
 .../java/org/apache/hadoop/mrunit/MapDriver.java   |    2 +-
 .../org/apache/hadoop/mrunit/ReduceDriver.java     |    2 +-
 .../mapreduce/AbstractMockContextWrapper.java      |    4 +-
 .../mapreduce/MockMapreduceOutputFormat.java       |   94 ++++-----------
 .../mrunit/internal/output/MockOutputCreator.java  |   26 +++-
 .../hadoop/mrunit/mapreduce/TestMapDriver.java     |   42 +++++++
 6 files changed, 88 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mrunit/blob/5d6edb33/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
index b02b5fc..7526fbd 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
@@ -128,7 +128,7 @@ public class MapDriver<K1, V1, K2, V2> extends MapDriverBase<K1, V1, K2, V2, Map
       preRunChecks(myMapper);
       initDistributedCache();
       final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
-          .createOutputCollectable(getConfiguration(),
+          .createMapredOutputCollectable(getConfiguration(),
               getOutputSerializationConfiguration());
       final MockReporter reporter = new MockReporter(
           MockReporter.ReporterType.Mapper, getCounters(),

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5d6edb33/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
index 20d2de4..8ef1a3b 100644
--- a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
@@ -131,7 +131,7 @@ public class ReduceDriver<K1, V1, K2, V2> extends
       preRunChecks(myReducer);
       initDistributedCache();
       final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
-          .createOutputCollectable(getConfiguration(),
+          .createMapredOutputCollectable(getConfiguration(),
               getOutputSerializationConfiguration());
       final MockReporter reporter = new MockReporter(
           MockReporter.ReporterType.Reducer, getCounters());

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5d6edb33/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
index 9fe84c5..e9df500 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
@@ -84,8 +84,8 @@ extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> {
           final Object[] args = invocation.getArguments();
           try {
             if(outputCollectable == null) {
-              outputCollectable = mockOutputCreator.createOutputCollectable(contextDriver.getConfiguration(), 
-                  contextDriver.getOutputSerializationConfiguration());
+              outputCollectable = mockOutputCreator.createMapReduceOutputCollectable(contextDriver.getConfiguration(), 
+                  contextDriver.getOutputSerializationConfiguration(), context);
             }
             outputCollectable.collect((KEYOUT)args[0], (VALUEOUT)args[1]);
           } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5d6edb33/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
index 831253a..47a3dfd 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
@@ -17,21 +17,18 @@
  */
 package org.apache.hadoop.mrunit.internal.mapreduce;
 
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -48,13 +45,7 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
 
   private static String ATTEMPT = "attempt_000000000000_0000_m_000000_0";
   private static TaskAttemptID TASK_ID = TaskAttemptID.forName(ATTEMPT);
-  private static final Class<?>[] TASK_ATTEMPT_CONTEXT_CLASSES = new Class<?>[] {
-      Configuration.class, TaskAttemptID.class };
-  private static final Class<?>[] JOB_CONTEXT_CLASSES = new Class<?>[] {
-      Configuration.class, JobID.class };
 
-  private final Job outputFormatJob;
-  private final Job inputFormatJob;
   private final File outputPath = new File(
       System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
   private TaskAttemptContext taskAttemptContext;
@@ -69,11 +60,10 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
   @SuppressWarnings("rawtypes")
   public MockMapreduceOutputFormat(Job outputFormatJob,
       Class<? extends OutputFormat> outputFormatClass,
-      Class<? extends InputFormat> inputFormatClass, Job inputFormatJob)
+      Class<? extends InputFormat> inputFormatClass, Job inputFormatJob,
+      TaskAttemptContext taskAttemptContext)
       throws IOException {
-    this.outputFormatJob = outputFormatJob;
-    this.inputFormatJob = inputFormatJob;
-
+    this.taskAttemptContext = taskAttemptContext;
     outputFormat = ReflectionUtils.newInstance(outputFormatClass,
         outputFormatJob.getConfiguration());
     inputFormat = ReflectionUtils.newInstance(inputFormatClass,
@@ -86,66 +76,34 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
     if (!outputPath.mkdir()) {
       throw new IOException("Failed to create output dir " + outputPath);
     }
-    FileOutputFormat.setOutputPath(outputFormatJob,
-        new Path(outputPath.toString()));
-  }
-
-  private void setClassIfUnset(String name, Class<?> classType) {
-    outputFormatJob.getConfiguration().setIfUnset(name, classType.getName());
-  }
-
-  private Object createObject(String primaryClassName,
-      String secondaryClassName, Class<?>[] constructorParametersClasses,
-      Object... constructorParameters) {
-    try {
-      Class<?> classType = Class.forName(primaryClassName);
-      try {
-        Constructor<?> constructor = classType
-            .getConstructor(constructorParametersClasses);
-        return constructor.newInstance(constructorParameters);
-      } catch (SecurityException e) {
-        throw new IllegalStateException(e);
-      } catch (NoSuchMethodException e) {
-        throw new IllegalStateException(e);
-      } catch (IllegalArgumentException e) {
-        throw new IllegalStateException(e);
-      } catch (InstantiationException e) {
-        throw new IllegalStateException(e);
-      } catch (IllegalAccessException e) {
-        throw new IllegalStateException(e);
-      } catch (InvocationTargetException e) {
-        throw new IllegalStateException(e);
-      }
-    } catch (ClassNotFoundException e) {
-      if (secondaryClassName == null) {
-        throw new IllegalStateException(e);
-      }
-      return createObject(secondaryClassName, null,
-          constructorParametersClasses, constructorParameters);
-    }
+    taskAttemptContext.getConfiguration().set(FileOutputFormat.OUTDIR, 
+        new Path(outputPath.toString()).toString());
+    taskAttemptContext.getConfiguration().set(FileInputFormat.INPUT_DIR, 
+        new Path((outputPath + "/*/*/*/*")).toString());
   }
-
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void collect(K key, V value) throws IOException {
     try {
       if (recordWriter == null) {
-        setClassIfUnset("mapred.output.key.class", key.getClass());
-        setClassIfUnset("mapred.output.value.class", value.getClass());
-
-        taskAttemptContext = (TaskAttemptContext) createObject(
-            "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
-            "org.apache.hadoop.mapreduce.TaskAttemptContext",
-            TASK_ATTEMPT_CONTEXT_CLASSES, outputFormatJob.getConfiguration(),
-            TASK_ID);
+        if(taskAttemptContext.getOutputKeyClass() == null) {
+          when(taskAttemptContext.getOutputKeyClass()).thenReturn((Class)key.getClass());
+        }
+        if(taskAttemptContext.getOutputValueClass() == null) {
+          when(taskAttemptContext.getOutputValueClass()).thenReturn((Class)value.getClass());
+        }
+        if(taskAttemptContext.getTaskAttemptID() == null) {
+          when(taskAttemptContext.getTaskAttemptID()).thenReturn(TASK_ID);
+        }
         recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
       }
-
       recordWriter.write(key, value);
     } catch (InterruptedException e) {
       throw new IllegalStateException(e);
     }
   }
 
+  @SuppressWarnings({ "unchecked"})
   @Override
   public List<Pair<K, V>> getOutputs() throws IOException {
     try {
@@ -155,14 +113,9 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
     }
 
     final Serialization serialization = new Serialization(
-        inputFormatJob.getConfiguration());
-    FileInputFormat.setInputPaths(inputFormatJob, outputPath + "/*/*/*/*");
+        taskAttemptContext.getConfiguration());
     try {
-      List<InputSplit> inputSplits = inputFormat
-          .getSplits((JobContext) createObject(
-              "org.apache.hadoop.mapreduce.task.JobContextImpl",
-              "org.apache.hadoop.mapreduce.JobContext", JOB_CONTEXT_CLASSES,
-              inputFormatJob.getConfiguration(), new JobID()));
+      List<InputSplit> inputSplits = inputFormat.getSplits(taskAttemptContext);
       for (InputSplit inputSplit : inputSplits) {
         RecordReader<K, V> recordReader = inputFormat.createRecordReader(
             inputSplit, taskAttemptContext);
@@ -179,5 +132,4 @@ public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V>
     FileUtil.fullyDelete(outputPath);
     return outputs;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5d6edb33/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
index e9fb98a..1ffb9c6 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.mrunit.internal.mapred.MockMapredOutputFormat;
 import org.apache.hadoop.mrunit.internal.mapreduce.MockMapreduceOutputFormat;
 
@@ -51,19 +52,30 @@ public class MockOutputCreator<K, V> {
     mapreduceInputFormatClass = returnNonNull(inputFormatClass);
   }
 
-  public OutputCollectable<K, V> createOutputCollectable(
+  @SuppressWarnings("deprecation")
+  public OutputCollectable<K, V> createMapReduceOutputCollectable(
       Configuration configuration,
-      Configuration outputCopyingOrInputFormatConfiguration) throws IOException {
+      Configuration outputCopyingOrInputFormatConfiguration,
+      TaskInputOutputContext context) throws IOException {
     outputCopyingOrInputFormatConfiguration = outputCopyingOrInputFormatConfiguration == null ? configuration
         : outputCopyingOrInputFormatConfiguration;
-    if (mapredOutputFormatClass != null) {
-      return new MockMapredOutputFormat<K, V>(new JobConf(configuration),
-          mapredOutputFormatClass, mapredInputFormatClass, new JobConf(
-              outputCopyingOrInputFormatConfiguration));
-    }
     if (mapreduceOutputFormatClass != null) {
       return new MockMapreduceOutputFormat<K, V>(new Job(configuration),
           mapreduceOutputFormatClass, mapreduceInputFormatClass, new Job(
+              outputCopyingOrInputFormatConfiguration),
+              context);
+    }
+    return new MockOutputCollector<K, V>(
+        outputCopyingOrInputFormatConfiguration);
+  }
+  public OutputCollectable<K, V> createMapredOutputCollectable(
+      Configuration configuration,
+      Configuration outputCopyingOrInputFormatConfiguration) throws IOException {
+    outputCopyingOrInputFormatConfiguration = outputCopyingOrInputFormatConfiguration == null ? configuration
+        : outputCopyingOrInputFormatConfiguration;    
+    if (mapredOutputFormatClass != null) {
+      return new MockMapredOutputFormat<K, V>(new JobConf(configuration),
+          mapredOutputFormatClass, mapredInputFormatClass, new JobConf(
               outputCopyingOrInputFormatConfiguration));
     }
     return new MockOutputCollector<K, V>(

http://git-wip-us.apache.org/repos/asf/mrunit/blob/5d6edb33/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
index ac4f1d7..20acd18 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.*;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -34,10 +35,14 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mrunit.ExpectedSuppliedException;
@@ -366,6 +371,43 @@ public class TestMapDriver {
     driver.runTest();
   }
 
+  // tests to ensure counters can be incremented in record writer
+  private static enum RecordCounter {
+    NUM_RECORDS
+  };
+  public static class CountingOutputFormat extends FileOutputFormat<Text, Text> {
+    private final TextOutputFormat<Text, Text> delegate;
+    public CountingOutputFormat() {
+      delegate = new TextOutputFormat<Text, Text>();
+    }
+    @Override
+    public RecordWriter<Text, Text> getRecordWriter(final TaskAttemptContext job)
+        throws IOException, InterruptedException {
+      final RecordWriter<Text, Text> writer = delegate.getRecordWriter(job);
+      return new RecordWriter<Text, Text>() {
+        @Override
+        public void write(Text key, Text value) throws IOException,
+            InterruptedException {
+          job.getCounter(RecordCounter.NUM_RECORDS).increment(1);
+          writer.write(key, value);
+        }
+        @Override
+        public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+          writer.close(context);
+        }        
+      };
+    }    
+  }
+  @Test
+  public void testCountingOutputFormat() throws IOException {
+    driver.withOutputFormat(CountingOutputFormat.class,
+        KeyValueTextInputFormat.class);
+    driver.withInput(new Text("a"), new Text("1"));
+    driver.withOutput(new Text("a"), new Text("1"));
+    driver.withCounter(RecordCounter.NUM_RECORDS, 1);
+    driver.runTest();
+  }
   @Test
   public void testOutputFormat() throws IOException {
     driver.withOutputFormat(SequenceFileOutputFormat.class,