You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by rs...@apache.org on 2012/10/19 08:54:01 UTC

[2/2] git commit: CRUNCH-93: Moving test support classes

CRUNCH-93: Moving test support classes

- Moved InMemoryEmitter to o.a.c.mem.emit package
- Created a TestContext in CrunchTestSupport class for unit testing of
  various components
- Deleted unit test code from DoFn e.g. configurationForTest
- All things are retrived from context
- Implement InMemorycontext that has only config elements for MemoryPipeline

Signed-off-by: Rahul Sharma <rs...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1cee024c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1cee024c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1cee024c

Branch: refs/heads/master
Commit: 1cee024c65938606d0d5794fbc4d10a1135f0f65
Parents: 14132b0
Author: Rahul Sharma <rs...@apache.org>
Authored: Fri Oct 12 21:40:52 2012 +0530
Committer: Rahul Sharma <rs...@apache.org>
Committed: Fri Oct 19 11:01:52 2012 +0530

----------------------------------------------------------------------
 crunch-test/pom.xml                                |    5 +
 .../org/apache/crunch/test/CrunchTestSupport.java  |   70 ++++++++++++++-
 .../java/org/apache/crunch/test/TestCounters.java  |   41 +++++++++
 .../apache/crunch/test/CrunchTestSupportTest.java  |   53 +++++++++++
 crunch/pom.xml                                     |    7 ++
 crunch/src/main/java/org/apache/crunch/DoFn.java   |   50 ++---------
 .../java/org/apache/crunch/fn/CompositeMapFn.java  |    6 --
 .../main/java/org/apache/crunch/fn/PairMapFn.java  |    5 -
 .../crunch/impl/mem/collect/MemCollection.java     |   59 ++++++++++++-
 .../crunch/impl/mem/emit/InMemoryEmitter.java      |   57 ++++++++++++
 .../crunch/io/avro/AvroFileReaderFactory.java      |    1 -
 .../apache/crunch/io/seq/SeqFileReaderFactory.java |    1 -
 .../crunch/io/seq/SeqFileTableReaderFactory.java   |    2 -
 .../crunch/io/text/TextFileReaderFactory.java      |    1 -
 .../org/apache/crunch/test/InMemoryEmitter.java    |   57 ------------
 .../java/org/apache/crunch/test/TestCounters.java  |   41 ---------
 .../apache/crunch/types/avro/AvroTableType.java    |   12 ---
 .../java/org/apache/crunch/types/avro/Avros.java   |   34 -------
 .../apache/crunch/types/writable/Writables.java    |   31 -------
 .../org/apache/crunch/lib/join/JoinFnTestBase.java |    9 +-
 .../java/org/apache/crunch/test/CountersTest.java  |   11 +--
 .../org/apache/crunch/types/avro/AvrosTest.java    |   28 ++++--
 22 files changed, 321 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-test/pom.xml b/crunch-test/pom.xml
index 6caf8ed..6378f31 100644
--- a/crunch-test/pom.xml
+++ b/crunch-test/pom.xml
@@ -59,6 +59,11 @@ under the License.
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
+    
+    <dependency>
+       <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.hamcrest</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java b/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java
index 7f74931..fc1a77a 100644
--- a/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java
+++ b/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java
@@ -17,14 +17,76 @@
  */
 package org.apache.crunch.test;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.junit.Rule;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
- * A temporary workaround for Scala tests to use when working with Rule annotations
- * until it gets fixed in JUnit 4.11.
+ * A temporary workaround for Scala tests to use when working with Rule
+ * annotations until it gets fixed in JUnit 4.11.
  */
 public class CrunchTestSupport {
   @Rule
-  public TemporaryPath tempDir = new TemporaryPath(
-      "crunch.tmp.dir", "hadoop.tmp.dir");
+  public TemporaryPath tempDir = new TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir");
+
+  /**
+   * The method creates a {@linkplain TaskInputOutputContext} which can be used
+   * in unit tests. The context servers very limited purpose. You can only
+   * operate with counters, taskAttempId ,status and configuration while using
+   * this context.
+   */
+  public static <KI, VI, KO, VO> TaskInputOutputContext<KI, VI, KO, VO> getTestContext(final Configuration config) {
+    TaskInputOutputContext<KI, VI, KO, VO> context = Mockito.mock(TaskInputOutputContext.class);
+    TestCounters.clearCounters();
+    final StateHolder holder = new StateHolder();
+
+    Mockito.when(context.getCounter(Mockito.any(Enum.class))).then(new Answer<Counter>() {
+      @Override
+      public Counter answer(InvocationOnMock invocation) throws Throwable {
+        Enum<?> counter = (Enum<?>) invocation.getArguments()[0];
+        return TestCounters.getCounter(counter);
+      }
+
+    });
+
+    Mockito.when(context.getCounter(Mockito.anyString(), Mockito.anyString())).then(new Answer<Counter>() {
+      @Override
+      public Counter answer(InvocationOnMock invocation) throws Throwable {
+        String group = (String) invocation.getArguments()[0];
+        String name = (String) invocation.getArguments()[0];
+        return TestCounters.getCounter(group, name);
+      }
+
+    });
+
+    Mockito.when(context.getConfiguration()).thenReturn(config);
+    Mockito.when(context.getTaskAttemptID()).thenReturn(new TaskAttemptID());
+
+    Mockito.when(context.getStatus()).then(new Answer<String>() {
+      @Override
+      public String answer(InvocationOnMock invocation) throws Throwable {
+        return holder.internalStatus;
+      }
+    });
+
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        holder.internalStatus = (String) invocation.getArguments()[0];
+        return null;
+      }
+    }).when(context).setStatus(Mockito.anyString());
+
+    return context;
+
+  }
+
+  static class StateHolder {
+    private String internalStatus;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java b/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java
new file mode 100644
index 0000000..bcb4da1
--- /dev/null
+++ b/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+
+/**
+ * A utility class used during unit testing to update and read counters.
+ */
+public class TestCounters {
+
+  private static Counters COUNTERS = new Counters();
+
+  public static Counter getCounter(Enum<?> e) {
+    return COUNTERS.findCounter(e);
+  }
+
+  public static Counter getCounter(String group, String name) {
+    return COUNTERS.findCounter(group, name);
+  }
+
+  public static void clearCounters() {
+    COUNTERS = new Counters();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java
----------------------------------------------------------------------
diff --git a/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java b/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java
new file mode 100644
index 0000000..65b86d9
--- /dev/null
+++ b/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Test;
+
+public class CrunchTestSupportTest {
+
+  enum CT {
+    ONE,
+  };
+
+  @Test
+  public void testContext() {
+    Configuration sampleConfig = new Configuration();
+    String status = "test";
+    TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(sampleConfig);
+    assertEquals(sampleConfig, testContext.getConfiguration());
+    TaskAttemptID taskAttemptID = testContext.getTaskAttemptID();
+    assertEquals(taskAttemptID, testContext.getTaskAttemptID());
+    assertNotNull(taskAttemptID);
+    assertNull(testContext.getStatus());
+    testContext.setStatus(status);
+    assertEquals(status, testContext.getStatus());
+    assertEquals(0, testContext.getCounter(CT.ONE).getValue());
+    testContext.getCounter(CT.ONE).increment(1);
+    assertEquals(1, testContext.getCounter(CT.ONE).getValue());
+    testContext.getCounter(CT.ONE).increment(4);
+    assertEquals(5, testContext.getCounter(CT.ONE).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch/pom.xml b/crunch/pom.xml
index 5dcc489..39dcc75 100644
--- a/crunch/pom.xml
+++ b/crunch/pom.xml
@@ -75,6 +75,13 @@ under the License.
       <artifactId>jackson-mapper-asl</artifactId>
       <scope>provided</scope>
     </dependency>
+    
+    <dependency>
+      <groupId>javassist</groupId>
+      <artifactId>javassist</artifactId>
+      <version>3.12.1.GA</version>
+    </dependency>
+            
 
     <!-- Both Protobufs and Thrift are supported as
          derived serialization types, and you can use

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/DoFn.java b/crunch/src/main/java/org/apache/crunch/DoFn.java
index 7d516de..8d7cc17 100644
--- a/crunch/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch/src/main/java/org/apache/crunch/DoFn.java
@@ -19,7 +19,6 @@ package org.apache.crunch;
 
 import java.io.Serializable;
 
-import org.apache.crunch.test.TestCounters;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -38,8 +37,6 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  */
 public abstract class DoFn<S, T> implements Serializable {
   private transient TaskInputOutputContext<?, ?, ?, ?> context;
-  private transient Configuration testConf;
-  private transient String internalStatus;
 
   /**
    * Configure this DoFn. Subclasses may override this method to modify the
@@ -61,8 +58,8 @@ public abstract class DoFn<S, T> implements Serializable {
    * this method to do appropriate initialization.
    * 
    * <p>
-   * Called during the setup of the job instance this {@code DoFn} is
-   * associated with.
+   * Called during the setup of the job instance this {@code DoFn} is associated
+   * with.
    * </p>
    * 
    */
@@ -110,16 +107,6 @@ public abstract class DoFn<S, T> implements Serializable {
   }
 
   /**
-   * Sets a {@code Configuration} instance to be used during unit tests.
-   * 
-   * @param conf
-   *          The Configuration instance.
-   */
-  public void setConfigurationForTest(Configuration conf) {
-    this.testConf = conf;
-  }
-
-  /**
    * Returns an estimate of how applying this function to a {@link PCollection}
    * will cause it to change in side. The optimizer uses these estimates to
    * decide where to break up dependent MR jobs into separate Map and Reduce
@@ -138,25 +125,14 @@ public abstract class DoFn<S, T> implements Serializable {
   }
 
   protected Configuration getConfiguration() {
-    if (context != null) {
-      return context.getConfiguration();
-    } else if (testConf != null) {
-      return testConf;
-    }
-    return null;
+    return context.getConfiguration();
   }
 
   protected Counter getCounter(Enum<?> counterName) {
-    if (context == null) {
-      return TestCounters.getCounter(counterName);
-    }
     return context.getCounter(counterName);
   }
 
   protected Counter getCounter(String groupName, String counterName) {
-    if (context == null) {
-      return TestCounters.getCounter(groupName, counterName);
-    }
     return context.getCounter(groupName, counterName);
   }
 
@@ -169,31 +145,19 @@ public abstract class DoFn<S, T> implements Serializable {
   }
 
   protected void progress() {
-    if (context != null) {
-      context.progress();
-    }
+    context.progress();
   }
 
   protected TaskAttemptID getTaskAttemptID() {
-    if (context != null) {
-      return context.getTaskAttemptID();
-    } else {
-      return new TaskAttemptID();
-    }
+    return context.getTaskAttemptID();
   }
 
   protected void setStatus(String status) {
-    if (context != null) {
-      context.setStatus(status);
-    }
-    this.internalStatus = status;
+    context.setStatus(status);
   }
 
   protected String getStatus() {
-    if (context != null) {
-      return context.getStatus();
-    }
-    return internalStatus;
+    return context.getStatus();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
index a41daf4..4714fe4 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
@@ -61,10 +61,4 @@ public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
     first.configure(conf);
     second.configure(conf);
   }
-
-  @Override
-  public void setConfigurationForTest(Configuration conf) {
-    first.setConfigurationForTest(conf);
-    second.setConfigurationForTest(conf);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
index 63634ef..b25a6d8 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
@@ -55,9 +55,4 @@ public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
     values.cleanup(null);
   }
 
-  @Override
-  public void setConfigurationForTest(Configuration conf) {
-    keys.setConfigurationForTest(conf);
-    values.setConfigurationForTest(conf);
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index a79ec2b..61bb1e7 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -17,8 +17,13 @@
  */
 package org.apache.crunch.impl.mem.collect;
 
+import java.lang.reflect.Method;
 import java.util.Collection;
 
+import javassist.util.proxy.MethodFilter;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.MapFn;
@@ -30,14 +35,20 @@ import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
 import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Sample;
 import org.apache.crunch.lib.Sort;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
-import org.apache.crunch.test.InMemoryEmitter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -103,6 +114,7 @@ public class MemCollection<S> implements PCollection<S> {
   @Override
   public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
     InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
+    doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
     doFn.initialize();
     for (S s : collect) {
       doFn.process(s, emitter);
@@ -214,4 +226,49 @@ public class MemCollection<S> implements PCollection<S> {
   public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
     return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
   }
+
+  /**
+   * The method creates a {@link TaskInputOutputContext} that will just provide
+   * {@linkplain Configuration}. The method has been implemented with javaassist
+   * as there are API changes in versions of Hadoop. In hadoop 1.0.3 the
+   * {@linkplain TaskInputOutputContext} is abstract class while in version 2
+   * the same is an interface.
+   * <p>
+   * Note: The intention of this is to provide the bare essentials that are
+   * required to make the {@linkplain MemPipeline} work. It lacks even the basic
+   * things that can proved some support for unit testing pipeline.
+   */
+  private static TaskInputOutputContext<?, ?, ?, ?> getInMemoryContext(final Configuration conf) {
+    ProxyFactory factory = new ProxyFactory();
+    Class<TaskInputOutputContext> superType = TaskInputOutputContext.class;
+    Class[] types = new Class[0];
+    Object[] args = new Object[0];
+    if (superType.isInterface()) {
+      factory.setInterfaces(new Class[] { superType });
+    } else {
+      types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class,
+          StatusReporter.class };
+      args = new Object[] { conf, new TaskAttemptID(), null, null, null };
+      factory.setSuperclass(superType);
+    }
+    factory.setFilter(new MethodFilter() {
+      @Override
+      public boolean isHandled(Method m) {
+        return m.getName().equals("getConfiguration");
+      }
+    });
+    MethodHandler handler = new MethodHandler() {
+      @Override
+      public Object invoke(Object arg0, Method arg1, Method arg2, Object[] arg3) throws Throwable {
+        return conf;
+      }
+    };
+    try {
+      Object newInstance = factory.create(types, args, handler);
+      return (TaskInputOutputContext<?, ?, ?, ?>) newInstance;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
new file mode 100644
index 0000000..6976615
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.emit;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+
+import com.google.common.collect.Lists;
+
+/**
+ * An {@code Emitter} instance that writes emitted records to a backing
+ * {@code List}.
+ * 
+ * @param <T>
+ */
+public class InMemoryEmitter<T> implements Emitter<T> {
+
+  private final List<T> output;
+
+  public InMemoryEmitter() {
+    this(Lists.<T> newArrayList());
+  }
+
+  public InMemoryEmitter(List<T> output) {
+    this.output = output;
+  }
+
+  @Override
+  public void emit(T emitted) {
+    output.add(emitted);
+  }
+
+  @Override
+  public void flush() {
+
+  }
+
+  public List<T> getOutput() {
+    return output;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index d1940cc..6f21dd2 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -69,7 +69,6 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
 
   @Override
   public Iterator<T> read(FileSystem fs, final Path path) {
-    this.mapFn.setConfigurationForTest(conf);
     this.mapFn.initialize();
     try {
       FsInput fsi = new FsInput(path, fs.getConf());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index 050c0fc..ad1b81b 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -54,7 +54,6 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
 
   @Override
   public Iterator<T> read(FileSystem fs, final Path path) {
-    mapFn.setConfigurationForTest(conf);
     mapFn.initialize();
     try {
       final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
index 7c34a75..20c749a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
@@ -59,9 +59,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K
 
   @Override
   public Iterator<Pair<K, V>> read(FileSystem fs, final Path path) {
-    keyMapFn.setConfigurationForTest(conf);
     keyMapFn.initialize();
-    valueMapFn.setConfigurationForTest(conf);
     valueMapFn.initialize();
     try {
       final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
index 5a512fc..a0c48e0 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
@@ -63,7 +63,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
         mapFn = ((CompositeMapFn) input).getSecond();
       }
     }
-    mapFn.setConfigurationForTest(conf);
     mapFn.initialize();
 
     FSDataInputStream is;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java b/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
deleted file mode 100644
index 1e0acb9..0000000
--- a/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.test;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-
-import com.google.common.collect.Lists;
-
-/**
- * An {@code Emitter} instance that writes emitted records to a backing
- * {@code List}.
- * 
- * @param <T>
- */
-public class InMemoryEmitter<T> implements Emitter<T> {
-
-  private final List<T> output;
-
-  public InMemoryEmitter() {
-    this(Lists.<T> newArrayList());
-  }
-
-  public InMemoryEmitter(List<T> output) {
-    this.output = output;
-  }
-
-  @Override
-  public void emit(T emitted) {
-    output.add(emitted);
-  }
-
-  @Override
-  public void flush() {
-
-  }
-
-  public List<T> getOutput() {
-    return output;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/test/TestCounters.java b/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
deleted file mode 100644
index bcb4da1..0000000
--- a/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.test;
-
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-
-/**
- * A utility class used during unit testing to update and read counters.
- */
-public class TestCounters {
-
-  private static Counters COUNTERS = new Counters();
-
-  public static Counter getCounter(Enum<?> e) {
-    return COUNTERS.findCounter(e);
-  }
-
-  public static Counter getCounter(String group, String name) {
-    return COUNTERS.findCounter(group, name);
-  }
-
-  public static void clearCounters() {
-    COUNTERS = new Counters();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index 285b423..5416c4f 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -57,12 +57,6 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      keyMapFn.setConfigurationForTest(conf);
-      valueMapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       keyMapFn.setContext(getContext());
       valueMapFn.setContext(getContext());
@@ -99,12 +93,6 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      firstMapFn.setConfigurationForTest(conf);
-      secondMapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       firstMapFn.setContext(getContext());
       secondMapFn.setContext(getContext());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 969af1d..4a83db5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -291,11 +291,6 @@ public class Avros {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       this.mapFn.setContext(getContext());
     }
@@ -335,11 +330,6 @@ public class Avros {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       this.mapFn.setContext(getContext());
     }
@@ -378,11 +368,6 @@ public class Avros {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       this.mapFn.setContext(getContext());
     }
@@ -410,11 +395,6 @@ public class Avros {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       this.mapFn.setContext(getContext());
     }
@@ -460,13 +440,6 @@ public class Avros {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.setConfigurationForTest(conf);
-      }
-    }
-
-    @Override
     public void initialize() {
       for (MapFn fn : fns) {
         fn.setContext(getContext());
@@ -527,13 +500,6 @@ public class Avros {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.setConfigurationForTest(conf);
-      }
-    }
-
-    @Override
     public void initialize() {
       this.schema = new Schema.Parser().parse(jsonSchema);
       for (MapFn fn : fns) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
index 23bc7f5..5e305b8 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -297,13 +297,6 @@ public class Writables {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.setConfigurationForTest(conf);
-      }
-    }
-
-    @Override
     public void initialize() {
       for (MapFn fn : fns) {
         fn.setContext(getContext());
@@ -353,12 +346,6 @@ public class Writables {
       }
     }
 
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.setConfigurationForTest(conf);
-      }
-    }
 
     @Override
     public void initialize() {
@@ -439,10 +426,6 @@ public class Writables {
       mapFn.configure(conf);
     }
 
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
 
     @Override
     public void initialize() {
@@ -474,10 +457,6 @@ public class Writables {
       mapFn.configure(conf);
     }
 
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
 
     @Override
     public void initialize() {
@@ -516,11 +495,6 @@ public class Writables {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       mapFn.setContext(getContext());
     }
@@ -551,11 +525,6 @@ public class Writables {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       mapFn.setContext(getContext());
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
index 741899e..9e4337f 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
+import org.apache.crunch.test.CrunchTestSupport;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
@@ -41,7 +42,7 @@ public abstract class JoinFnTestBase {
   @Before
   public void setUp() {
     joinFn = getJoinFn();
-    joinFn.setConfigurationForTest(new Configuration());
+    joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration()));
     joinFn.initialize();
     emitter = mock(Emitter.class);
   }
@@ -67,13 +68,11 @@ public abstract class JoinFnTestBase {
 
   }
 
-  protected abstract void checkOutput(
-      Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter);
+  protected abstract void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter);
 
   protected abstract JoinFn<StringWrapper, StringWrapper, String> getJoinFn();
 
-  protected List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue,
-      String rightValue) {
+  protected List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue, String rightValue) {
     Pair<StringWrapper, String> valuePair = Pair.of(leftValue, rightValue);
     List<Pair<StringWrapper, String>> valuePairList = Lists.newArrayList();
     valuePairList.add(valuePair);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
index 3df7657..66f854e 100644
--- a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
+++ b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
-import org.junit.After;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 /**
@@ -35,12 +35,11 @@ public class CountersTest {
     THREE
   };
 
-  @After
-  public void after() {
-    TestCounters.clearCounters();
-  }
-
   public static class CTFn extends DoFn<String, String> {
+    CTFn() {
+      setContext(CrunchTestSupport.getTestContext(new Configuration()));
+    }
+
     @Override
     public void process(String input, Emitter<String> emitter) {
       getCounter(CT.ONE).increment(1);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
index 801829d..dabf0fe 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
@@ -37,13 +37,16 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.test.CrunchTestSupport;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -186,6 +189,11 @@ public class AvrosTest {
   @SuppressWarnings("rawtypes")
   public void testWritables() throws Exception {
     AvroType at = Avros.writables(LongWritable.class);
+    
+    TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(new Configuration());
+    at.getInputMapFn().setContext(testContext);
+    at.getOutputMapFn().setContext(testContext);
+
     LongWritable lw = new LongWritable(1729L);
     assertEquals(lw, at.getInputMapFn().map(at.getOutputMapFn().map(lw)));
   }
@@ -222,9 +230,9 @@ public class AvrosTest {
   public void testIsPrimitive_PrimitiveMappedType() {
     assertTrue(Avros.isPrimitive(Avros.ints()));
   }
-  
+
   @Test
-  public void testIsPrimitive_TruePrimitiveValue(){
+  public void testIsPrimitive_TruePrimitiveValue() {
     AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT), new DeepCopier.NoOpDeepCopier());
     assertTrue(Avros.isPrimitive(truePrimitiveAvroType));
   }
@@ -294,20 +302,20 @@ public class AvrosTest {
     assertEquals(pair, doubleMappedPair);
 
   }
-  
+
   @Test
-  public void testPairOutputMapFn_VerifyNoObjectReuse(){
+  public void testPairOutputMapFn_VerifyNoObjectReuse() {
     StringWrapper stringWrapper = new StringWrapper("Test");
-    
-    Pair<Integer,StringWrapper> pair = Pair.of(1, stringWrapper);
-    
+
+    Pair<Integer, StringWrapper> pair = Pair.of(1, stringWrapper);
+
     AvroType<Pair<Integer, StringWrapper>> pairType = Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class));
-    
+
     pairType.getOutputMapFn().initialize();
-    
+
     Object outputMappedValueA = pairType.getOutputMapFn().map(pair);
     Object outputMappedValueB = pairType.getOutputMapFn().map(pair);
-    
+
     assertEquals(outputMappedValueA, outputMappedValueB);
     assertNotSame(outputMappedValueA, outputMappedValueB);
   }