You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrunit.apache.org by br...@apache.org on 2012/08/09 21:13:38 UTC

svn commit: r1371388 - in /mrunit/trunk/src: main/java/org/apache/hadoop/mrunit/ main/java/org/apache/hadoop/mrunit/internal/mapred/ main/java/org/apache/hadoop/mrunit/mapreduce/ test/java/org/apache/hadoop/mrunit/

Author: brock
Date: Thu Aug  9 19:13:38 2012
New Revision: 1371388

URL: http://svn.apache.org/viewvc?rev=1371388&view=rev
Log:
MRUNIT-133: MapInput stuff should be moved from TestDriver to MapDriver

Added:
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java
Modified:
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriver.java Thu Aug  9 19:13:38 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.mrunit;
 
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
 
 import java.io.IOException;
 import java.util.List;
@@ -82,7 +82,7 @@ public class MapDriver<K1, V1, K2, V2> e
     setCounters(ctrs);
     return this;
   }
-
+  
   /**
    * Set the Mapper instance to use with this test driver
    * 

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java Thu Aug  9 19:13:38 2012
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -41,7 +42,9 @@ public abstract class MapDriverBase<K1, 
   public static final Log LOG = LogFactory.getLog(MapDriverBase.class);
 
   protected List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
-  
+
+  protected Path mapInputPath = new Path("somefile");
+
   @Deprecated
   protected K1 inputKey;
   @Deprecated
@@ -97,6 +100,7 @@ public abstract class MapDriverBase<K1, 
    * @param inputRecord
    *          a (key, val) pair
    */
+  @SuppressWarnings("deprecation")
   public void setInput(final Pair<K1, V1> inputRecord) {
     setInputKey(inputRecord.getFirst());
     setInputValue(inputRecord.getSecond());
@@ -340,8 +344,33 @@ public abstract class MapDriverBase<K1, 
   }
 
   /**
+   * @return the path passed to the mapper InputSplit
+   */
+  public Path getMapInputPath() {
+    return mapInputPath;
+  }
+
+  /**
+   * @param mapInputPath Path which is to be passed to the mappers InputSplit
+   */
+  public void setMapInputPath(Path mapInputPath) {
+    this.mapInputPath = mapInputPath;
+  }
+  
+  /**
+   * @param mapInputPath
+   *       The Path object which will be given to the mapper
+   * @return
+   */
+  public final T withMapInputPath(Path mapInputPath) {
+    setMapInputPath(mapInputPath);
+    return thisAsTestDriver();
+  }
+  
+  /**
    * Handle inputKey and inputVal for backwards compatibility.
    */
+  @SuppressWarnings("deprecation")
   protected void preRunChecks(Object mapper) {
     if (inputKey != null && inputVal != null) {
       setInput(inputKey, inputVal);

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Thu Aug  9 19:13:38 2012
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
@@ -51,6 +52,8 @@ public abstract class MapReduceDriverBas
   public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
 
   protected List<Pair<K1, V1>> inputList = new ArrayList<Pair<K1, V1>>();
+  
+  protected Path mapInputPath = new Path("somefile");
 
   /** Key group comparator */
   protected Comparator<K2> keyGroupComparator;
@@ -270,6 +273,30 @@ public abstract class MapReduceDriverBas
     addAllOutput(outputRecords);
     return thisAsMapReduceDriver();
   }
+  
+  /**
+   * @return the path passed to the mapper InputSplit
+   */
+  public Path getMapInputPath() {
+    return mapInputPath;
+  }
+
+  /**
+   * @param mapInputPath Path which is to be passed to the mappers InputSplit
+   */
+  public void setMapInputPath(Path mapInputPath) {
+    this.mapInputPath = mapInputPath;
+  }
+  
+  /**
+   * @param mapInputPath
+   *       The Path object which will be given to the mapper
+   * @return
+   */
+  public final T withMapInputPath(Path mapInputPath) {
+    setMapInputPath(mapInputPath);
+    return thisAsTestDriver();
+  }
 
   protected void preRunChecks(Object mapper, Object reducer) {
     if (inputList.isEmpty()) {

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java Thu Aug  9 19:13:38 2012
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
@@ -61,6 +62,8 @@ public class PipelineMapReduceDriver<K1,
   private List<Pair<Mapper, Reducer>> mapReducePipeline;
   private final List<Pair<K1, V1>> inputList;
   private Counters counters;
+  
+  protected Path mapInputPath = new Path("somefile");
 
   public PipelineMapReduceDriver(final List<Pair<Mapper, Reducer>> pipeline) {
     this();
@@ -308,6 +311,30 @@ public class PipelineMapReduceDriver<K1,
     addOutputFromString(output);
     return this;
   }
+  
+  /**
+   * @return the path passed to the mapper InputSplit
+   */
+  public Path getMapInputPath() {
+    return mapInputPath;
+  }
+
+  /**
+   * @param mapInputPath Path which is to be passed to the mappers InputSplit
+   */
+  public void setMapInputPath(Path mapInputPath) {
+    this.mapInputPath = mapInputPath;
+  }
+  
+  /**
+   * @param mapInputPath
+   *       The Path object which will be given to the mapper
+   * @return
+   */
+  public final PipelineMapReduceDriver<K1, V1, K2, V2> withMapInputPath(Path mapInputPath) {
+    setMapInputPath(mapInputPath);
+    return this;
+  }
 
   @Override
   @SuppressWarnings("unchecked")
@@ -329,6 +356,7 @@ public class PipelineMapReduceDriver<K1,
 
       mrDriver.setCounters(getCounters());
       mrDriver.setConfiguration(configuration);
+      mrDriver.setMapInputPath(mapInputPath);
 
       // Add the inputs from the user, or from the previous stage of the
       // pipeline.

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java Thu Aug  9 19:13:38 2012
@@ -126,8 +126,7 @@ public class ReduceDriver<K1, V1, K2, V2
           .createOutputCollectable(getConfiguration(),
               getOutputCopyingOrInputFormatConfiguration());
       final MockReporter reporter = new MockReporter(
-          MockReporter.ReporterType.Reducer, getCounters(),
-          getMapInputPath());
+          MockReporter.ReporterType.Reducer, getCounters());
 
       ReflectionUtils.setConf(myReducer, new JobConf(getConfiguration()));
 

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/TestDriver.java Thu Aug  9 19:13:38 2012
@@ -56,8 +56,6 @@ public abstract class TestDriver<K1, V1,
   protected CounterWrapper counterWrapper;
 
   protected Serialization serialization;
-  
-  protected Path mapInputPath = new Path("somefile");
 
   public TestDriver() {
     expectedOutputs = new ArrayList<Pair<K2, V2>>();
@@ -104,7 +102,7 @@ public abstract class TestDriver<K1, V1,
   }
   
   @SuppressWarnings("unchecked")
-  private T thisAsTestDriver() {
+  protected T thisAsTestDriver() {
     return (T) this;
   }
 
@@ -193,30 +191,6 @@ public abstract class TestDriver<K1, V1,
   }
 
   /**
-   * @return the path passed to the mapper InputSplit
-   */
-  public Path getMapInputPath() {
-    return mapInputPath;
-  }
-
-  /**
-   * @param mapInputPath Path which is to be passed to the mappers InputSplit
-   */
-  public void setMapInputPath(Path mapInputPath) {
-    this.mapInputPath = mapInputPath;
-  }
-  
-  /**
-   * @param mapInputPath
-   *       The Path object which will be given to the mapper
-   * @return
-   */
-  public final T withMapInputPath(Path mapInputPath) {
-    setMapInputPath(mapInputPath);
-    return thisAsTestDriver();
-  }
-
-  /**
    * Adds a file to be put on the distributed cache. 
    * The path may be relative and will try to be resolved from
    * the classpath of the test. 

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java Thu Aug  9 19:13:38 2012
@@ -34,11 +34,19 @@ public class MockReporter implements Rep
 
   private final ReporterType typ;
 
+  public MockReporter(final ReporterType kind, final Counters ctrs) {
+    this(kind, ctrs, null);
+  }
+  
   public MockReporter(final ReporterType kind, final Counters ctrs,
       final Path mapInputPath) {
     typ = kind;
     counters = ctrs;
-    inputSplit = new MockInputSplit(mapInputPath);
+    if(mapInputPath == null) {
+      inputSplit = null;
+    } else {
+      inputSplit = new MockInputSplit(mapInputPath);
+    }
   }
 
   @Override

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java Thu Aug  9 19:13:38 2012
@@ -18,14 +18,13 @@
 
 package org.apache.hadoop.mrunit.mapreduce;
 
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
 
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.OutputFormat;

Added: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java?rev=1371388&view=auto
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java (added)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java Thu Aug  9 19:13:38 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+class InputPathStoringMapper<VALUEIN, VALUEOUT> extends MapReduceBase implements
+    Mapper<Text, VALUEIN, Text, VALUEOUT> {
+  private Path mapInputPath;
+
+  @Override
+  public void map(Text key, VALUEIN value, OutputCollector<Text, VALUEOUT> output,
+      Reporter reporter) throws IOException {
+    if (reporter.getInputSplit() instanceof FileSplit) {
+      mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
+    }
+  }
+
+  Path getMapInputPath() {
+    return mapInputPath;
+  }
+}
\ No newline at end of file

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java Thu Aug  9 19:13:38 2012
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
@@ -543,26 +542,9 @@ public class TestMapDriver {
     driver.runTest();
   }
 
-  private static class InputPathStoringMapper extends MapReduceBase implements
-      Mapper<Text, Text, Text, Text> {
-    private Path mapInputPath;
-
-    @Override
-    public void map(Text key, Text value, OutputCollector<Text, Text> output,
-        Reporter reporter) throws IOException {
-      if (reporter.getInputSplit() instanceof FileSplit) {
-        mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
-      }
-    }
-
-    private Path getMapInputPath() {
-      return mapInputPath;
-    }
-  }
-
   @Test
   public void testMapInputFile() throws IOException {
-    InputPathStoringMapper mapper = new InputPathStoringMapper();
+    InputPathStoringMapper<Text, Text> mapper = new InputPathStoringMapper<Text, Text>();
     Path mapInputPath = new Path("myfile");
     driver = MapDriver.newMapDriver(mapper);
     driver.setMapInputPath(mapInputPath);

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java Thu Aug  9 19:13:38 2012
@@ -566,27 +566,10 @@ public class TestMapReduceDriver {
     driver.runTest();
   }
 
-  private static class InputPathStoringMapper extends MapReduceBase implements
-      Mapper<Text, LongWritable, Text, LongWritable> {
-    private Path mapInputPath;
-
-    @Override
-    public void map(Text key, LongWritable value,
-        OutputCollector<Text, LongWritable> output, Reporter reporter)
-        throws IOException {
-      if (reporter.getInputSplit() instanceof FileSplit) {
-        mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
-      }
-    }
-
-    private Path getMapInputPath() {
-      return mapInputPath;
-    }
-  }
-
   @Test
   public void testMapInputFile() throws IOException {
-    InputPathStoringMapper mapper = new InputPathStoringMapper();
+    InputPathStoringMapper<LongWritable,LongWritable> mapper = 
+        new InputPathStoringMapper<LongWritable,LongWritable>();
     Path mapInputPath = new Path("myfile");
     driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
     driver.setMapInputPath(mapInputPath);

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java?rev=1371388&r1=1371387&r2=1371388&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java Thu Aug  9 19:13:38 2012
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.mrunit;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -306,4 +309,20 @@ public class TestPipelineMapReduceDriver
     driver.addOutput(key, value);
     driver.runTest();
   }
+  
+  @Test
+  public void testMapInputFile() throws IOException {
+    InputPathStoringMapper<LongWritable,LongWritable> mapper = 
+        new InputPathStoringMapper<LongWritable,LongWritable>();
+    Path mapInputPath = new Path("myfile");
+    final PipelineMapReduceDriver<Text, LongWritable, Text, LongWritable> driver = PipelineMapReduceDriver
+        .newPipelineMapReduceDriver();
+    driver.addMapReduce(mapper, new IdentityReducer<Text, LongWritable>());
+    driver.setMapInputPath(mapInputPath);
+    assertEquals(mapInputPath.getName(), driver.getMapInputPath().getName());
+    driver.withInput(new Text("a"), new LongWritable(1));
+    driver.runTest();
+    assertNotNull(mapper.getMapInputPath());
+    assertEquals(mapInputPath.getName(), mapper.getMapInputPath().getName());
+  }
 }