You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:25 UTC

[23/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java
new file mode 100644
index 0000000..7e2e444
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class BrokenLeftAndOuterJoinTest {
+
+  List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue, String rightValue) {
+    Pair<StringWrapper, String> valuePair = Pair.of(leftValue, rightValue);
+    List<Pair<StringWrapper, String>> valuePairList = Lists.newArrayList();
+    valuePairList.add(valuePair);
+    return valuePairList;
+  }
+  
+  @Test
+  public void testOuterJoin() {
+    JoinFn<StringWrapper, StringWrapper, String> joinFn = new LeftOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+    joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration()));
+    joinFn.initialize();
+    Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter = mock(Emitter.class);
+    
+    StringWrapper key = new StringWrapper();
+    StringWrapper leftValue = new StringWrapper();
+    key.setValue("left-only");
+    leftValue.setValue("left-only-left");
+    joinFn.join(key, 0, createValuePairList(leftValue, null), emitter);
+
+    key.setValue("right-only");
+    joinFn.join(key, 1, createValuePairList(null, "right-only-right"), emitter);
+
+    verify(emitter).emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null)));
+    verifyNoMoreInteractions(emitter);
+  }
+  
+  @Test
+  public void testFullJoin() {
+    JoinFn<StringWrapper, StringWrapper, String> joinFn = new FullOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+    joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration()));
+    joinFn.initialize();
+    Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter = mock(Emitter.class);
+    
+    StringWrapper key = new StringWrapper();
+    StringWrapper leftValue = new StringWrapper();
+    key.setValue("left-only");
+    leftValue.setValue("left-only-left");
+    joinFn.join(key, 0, createValuePairList(leftValue, null), emitter);
+
+    key.setValue("right-only");
+    joinFn.join(key, 1, createValuePairList(null, "right-only-right"), emitter);
+
+    verify(emitter).emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null)));
+    verify(emitter).emit(Pair.of(wrap("right-only"), Pair.of((StringWrapper)null, "right-only-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
new file mode 100644
index 0000000..5cf4f51
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/join/FullOuterJoinFnTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class FullOuterJoinFnTest extends JoinFnTestBase {
+
+  @Override
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter) {
+    verify(emitter)
+        .emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null)));
+    verify(emitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verify(emitter).emit(
+        Pair.of(wrap("right-only"), Pair.of((StringWrapper) null, "right-only-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new FullOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/join/InnerJoinFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/join/InnerJoinFnTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/join/InnerJoinFnTest.java
new file mode 100644
index 0000000..d2347de
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/join/InnerJoinFnTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class InnerJoinFnTest extends JoinFnTestBase {
+
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> joinEmitter) {
+    verify(joinEmitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verifyNoMoreInteractions(joinEmitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new InnerJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java b/crunch-core/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
new file mode 100644
index 0000000..9e4337f
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public abstract class JoinFnTestBase {
+
+  private JoinFn<StringWrapper, StringWrapper, String> joinFn;
+
+  private Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter;
+
+  // Avoid warnings on generic Emitter mock
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() {
+    joinFn = getJoinFn();
+    joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration()));
+    joinFn.initialize();
+    emitter = mock(Emitter.class);
+  }
+
+  @Test
+  public void testJoin() {
+
+    StringWrapper key = new StringWrapper();
+    StringWrapper leftValue = new StringWrapper();
+    key.setValue("left-only");
+    leftValue.setValue("left-only-left");
+    joinFn.join(key, 0, createValuePairList(leftValue, null), emitter);
+
+    key.setValue("both");
+    leftValue.setValue("both-left");
+    joinFn.join(key, 0, createValuePairList(leftValue, null), emitter);
+    joinFn.join(key, 1, createValuePairList(null, "both-right"), emitter);
+
+    key.setValue("right-only");
+    joinFn.join(key, 1, createValuePairList(null, "right-only-right"), emitter);
+
+    checkOutput(emitter);
+
+  }
+
+  protected abstract void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter);
+
+  protected abstract JoinFn<StringWrapper, StringWrapper, String> getJoinFn();
+
+  protected List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue, String rightValue) {
+    Pair<StringWrapper, String> valuePair = Pair.of(leftValue, rightValue);
+    List<Pair<StringWrapper, String>> valuePairList = Lists.newArrayList();
+    valuePairList.add(valuePair);
+    return valuePairList;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
new file mode 100644
index 0000000..a90457e
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class LeftOuterJoinTest extends JoinFnTestBase {
+
+  @Override
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter) {
+    verify(emitter)
+        .emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null)));
+    verify(emitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new LeftOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/lib/join/RightOuterJoinFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/join/RightOuterJoinFnTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/join/RightOuterJoinFnTest.java
new file mode 100644
index 0000000..7e41284
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/join/RightOuterJoinFnTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+
+public class RightOuterJoinFnTest extends JoinFnTestBase {
+
+  @Override
+  protected void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter) {
+    verify(emitter).emit(Pair.of(wrap("both"), Pair.of(wrap("both-left"), "both-right")));
+    verify(emitter).emit(
+        Pair.of(wrap("right-only"), Pair.of((StringWrapper) null, "right-only-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+
+  @Override
+  protected JoinFn<StringWrapper, StringWrapper, String> getJoinFn() {
+    return new RightOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java b/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java
new file mode 100644
index 0000000..66f854e
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/test/CountersTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+/**
+ * A test to verify using counters inside of a unit test works. :)
+ */
+public class CountersTest {
+
+  public enum CT {
+    ONE,
+    TWO,
+    THREE
+  };
+
+  public static class CTFn extends DoFn<String, String> {
+    CTFn() {
+      setContext(CrunchTestSupport.getTestContext(new Configuration()));
+    }
+
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      getCounter(CT.ONE).increment(1);
+      getCounter(CT.TWO).increment(4);
+      getCounter(CT.THREE).increment(7);
+    }
+  }
+
+  @Test
+  public void test() {
+    CTFn fn = new CTFn();
+    fn.process("foo", null);
+    fn.process("bar", null);
+    assertEquals(2L, TestCounters.getCounter(CT.ONE).getValue());
+    assertEquals(8L, TestCounters.getCounter(CT.TWO).getValue());
+    assertEquals(14L, TestCounters.getCounter(CT.THREE).getValue());
+  }
+
+  @Test
+  public void secondTest() {
+    CTFn fn = new CTFn();
+    fn.process("foo", null);
+    fn.process("bar", null);
+    assertEquals(2L, TestCounters.getCounter(CT.ONE).getValue());
+    assertEquals(8L, TestCounters.getCounter(CT.TWO).getValue());
+    assertEquals(14L, TestCounters.getCounter(CT.THREE).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/test/StringWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/test/StringWrapper.java b/crunch-core/src/test/java/org/apache/crunch/test/StringWrapper.java
new file mode 100644
index 0000000..34302b5
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/test/StringWrapper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import org.apache.crunch.MapFn;
+
+/**
+ * Simple String wrapper for testing with Avro reflection.
+ */
+public class StringWrapper implements Comparable<StringWrapper> {
+
+  public static class StringToStringWrapperMapFn extends MapFn<String, StringWrapper> {
+
+    @Override
+    public StringWrapper map(String input) {
+      return wrap(input);
+    }
+
+  }
+
+  public static class StringWrapperToStringMapFn extends MapFn<StringWrapper, String> {
+
+    @Override
+    public String map(StringWrapper input) {
+      return input.getValue();
+    }
+
+  }
+
+  private String value;
+
+  public StringWrapper() {
+    this("");
+  }
+
+  public StringWrapper(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public int compareTo(StringWrapper o) {
+    return this.value.compareTo(o.value);
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setValue(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    StringWrapper other = (StringWrapper) obj;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "StringWrapper [value=" + value + "]";
+  }
+
+  public static StringWrapper wrap(String value) {
+    return new StringWrapper(value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
new file mode 100644
index 0000000..bd7fcd7
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collection;
+
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CollectionDeepCopierTest {
+
+  @Test
+  public void testDeepCopy() {
+    Person person = new Person();
+    person.age = 42;
+    person.name = "John Smith";
+    person.siblingnames = Lists.<CharSequence> newArrayList();
+
+    Collection<Person> personCollection = Lists.newArrayList(person);
+    CollectionDeepCopier<Person> collectionDeepCopier = new CollectionDeepCopier<Person>(
+        Avros.records(Person.class));
+    collectionDeepCopier.initialize(new Configuration());
+
+    Collection<Person> deepCopyCollection = collectionDeepCopier.deepCopy(personCollection);
+
+    assertEquals(personCollection, deepCopyCollection);
+    assertNotSame(personCollection.iterator().next(), deepCopyCollection.iterator().next());
+  }
+
+  @Test
+  public void testNullDeepCopy() {
+    CollectionDeepCopier<Person> collectionDeepCopier = new CollectionDeepCopier<Person>(
+        Avros.records(Person.class));
+    collectionDeepCopier.initialize(new Configuration());
+    Collection<Person> nullCollection = null;
+    assertNull(collectionDeepCopier.deepCopy(nullCollection));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java
new file mode 100644
index 0000000..c13e4a2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+
+import java.util.Map;
+
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class MapDeepCopierTest {
+
+  @Test
+  public void testDeepCopy() {
+    StringWrapper stringWrapper = new StringWrapper("value");
+    String key = "key";
+    Map<String, StringWrapper> map = Maps.newHashMap();
+    map.put(key, stringWrapper);
+
+    MapDeepCopier<StringWrapper> deepCopier = new MapDeepCopier<StringWrapper>(
+        Avros.reflects(StringWrapper.class));
+    deepCopier.initialize(new Configuration());
+    Map<String, StringWrapper> deepCopy = deepCopier.deepCopy(map);
+
+    assertEquals(map, deepCopy);
+    assertNotSame(map.get(key), deepCopy.get(key));
+  }
+  
+  @Test
+  public void testDeepCopy_Null() {
+    Map<String, StringWrapper> map = null;
+
+    MapDeepCopier<StringWrapper> deepCopier = new MapDeepCopier<StringWrapper>(
+        Avros.reflects(StringWrapper.class));
+    deepCopier.initialize(new Configuration());
+    Map<String, StringWrapper> deepCopy = deepCopier.deepCopy(map);
+
+    assertNull(deepCopy);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java b/crunch-core/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
new file mode 100644
index 0000000..e6fd90c
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Collection;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PTypeUtilsTest {
+  @Test
+  public void testPrimitives() {
+    assertEquals(Avros.strings(), AvroTypeFamily.getInstance().as(Writables.strings()));
+    Assert.assertEquals(Writables.doubles(), WritableTypeFamily.getInstance().as(Avros.doubles()));
+  }
+
+  @Test
+  public void testTuple3() {
+    PType<Tuple3<String, Float, Integer>> t = Writables.triples(Writables.strings(), Writables.floats(),
+        Writables.ints());
+    PType<Tuple3<String, Float, Integer>> at = AvroTypeFamily.getInstance().as(t);
+    assertEquals(Avros.strings(), at.getSubTypes().get(0));
+    assertEquals(Avros.floats(), at.getSubTypes().get(1));
+    assertEquals(Avros.ints(), at.getSubTypes().get(2));
+  }
+
+  @Test
+  public void testTupleN() {
+    PType<TupleN> t = Avros.tuples(Avros.strings(), Avros.floats(), Avros.ints());
+    PType<TupleN> wt = WritableTypeFamily.getInstance().as(t);
+    assertEquals(Writables.strings(), wt.getSubTypes().get(0));
+    assertEquals(Writables.floats(), wt.getSubTypes().get(1));
+    assertEquals(Writables.ints(), wt.getSubTypes().get(2));
+  }
+
+  @Test
+  public void testWritableCollections() {
+    PType<Collection<String>> t = Avros.collections(Avros.strings());
+    t = WritableTypeFamily.getInstance().as(t);
+    assertEquals(Writables.strings(), t.getSubTypes().get(0));
+  }
+
+  @Test
+  public void testAvroCollections() {
+    PType<Collection<Double>> t = Writables.collections(Writables.doubles());
+    t = AvroTypeFamily.getInstance().as(t);
+    assertEquals(Avros.doubles(), t.getSubTypes().get(0));
+  }
+
+  @Test
+  public void testAvroRegistered() {
+    AvroType<Utf8> at = new AvroType<Utf8>(Utf8.class, Schema.create(Schema.Type.STRING), new DeepCopier.NoOpDeepCopier<Utf8>());
+    Avros.register(Utf8.class, at);
+    assertEquals(at, Avros.records(Utf8.class));
+  }
+
+  @Test
+  public void testWritableBuiltin() {
+    assertNotNull(Writables.records(Text.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/PTypesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/PTypesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/PTypesTest.java
new file mode 100644
index 0000000..d7c8811
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/PTypesTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.junit.Test;
+
+public class PTypesTest {
+  @Test
+  public void testUUID() throws Exception {
+    UUID uuid = UUID.randomUUID();
+    PType<UUID> ptype = PTypes.uuid(AvroTypeFamily.getInstance());
+    assertEquals(uuid, ptype.getInputMapFn().map(ptype.getOutputMapFn().map(uuid)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
new file mode 100644
index 0000000..e46a680
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TupleDeepCopierTest {
+
+  @Test
+  public void testDeepCopy_Pair() {
+    Person person = new Person();
+    person.name = "John Doe";
+    person.age = 42;
+    person.siblingnames = Lists.<CharSequence> newArrayList();
+
+    Pair<Integer, Person> inputPair = Pair.of(1, person);
+    DeepCopier<Pair> deepCopier = new TupleDeepCopier<Pair>(Pair.class, Avros.ints(),
+        Avros.records(Person.class));
+
+    deepCopier.initialize(new Configuration());
+    Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair);
+
+    assertEquals(inputPair, deepCopyPair);
+    assertNotSame(inputPair.second(), deepCopyPair.second());
+  }
+  
+  @Test
+  public void testDeepCopy_PairContainingNull() {
+
+    Pair<Integer, Person> inputPair = Pair.of(1, null);
+    DeepCopier<Pair> deepCopier = new TupleDeepCopier<Pair>(Pair.class, Avros.ints(),
+        Avros.records(Person.class));
+
+    deepCopier.initialize(new Configuration());
+    Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair);
+
+    assertEquals(inputPair, deepCopyPair);
+  }
+  
+  @Test
+  public void testDeepCopy_NullPair() {
+    Pair<Integer, Person> inputPair = null;
+    DeepCopier<Pair> deepCopier = new TupleDeepCopier<Pair>(Pair.class, Avros.ints(),
+        Avros.records(Person.class));
+
+    deepCopier.initialize(new Configuration());
+    Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair);
+
+    assertNull(deepCopyPair);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/TupleFactoryTest.java b/crunch-core/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
new file mode 100644
index 0000000..0726be2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.junit.Test;
+
+public class TupleFactoryTest {
+
+  @Test
+  public void testGetTupleFactory_Pair() {
+    assertEquals(TupleFactory.PAIR, TupleFactory.getTupleFactory(Pair.class));
+  }
+
+  @Test
+  public void testGetTupleFactory_Tuple3() {
+    assertEquals(TupleFactory.TUPLE3, TupleFactory.getTupleFactory(Tuple3.class));
+  }
+
+  @Test
+  public void testGetTupleFactory_Tuple4() {
+    assertEquals(TupleFactory.TUPLE4, TupleFactory.getTupleFactory(Tuple4.class));
+  }
+
+  @Test
+  public void testGetTupleFactory_TupleN() {
+    assertEquals(TupleFactory.TUPLEN, TupleFactory.getTupleFactory(TupleN.class));
+  }
+
+  public void testGetTupleFactory_CustomTupleClass() {
+	TupleFactory<CustomTupleImplementation> customTupleFactory = TupleFactory.create(CustomTupleImplementation.class);
+    assertEquals(customTupleFactory, TupleFactory.getTupleFactory(CustomTupleImplementation.class));
+  }
+
+  private static class CustomTupleImplementation implements Tuple {
+
+    @Override
+    public Object get(int index) {
+      return null;
+    }
+
+    @Override
+    public int size() {
+      return 0;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
new file mode 100644
index 0000000..37c13c0
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+
+import java.util.List;
+
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroDeepCopierTest {
+  
+  @Test
+  public void testDeepCopySpecific() {
+    Person person = new Person();
+    person.name = "John Doe";
+    person.age = 42;
+    person.siblingnames = Lists.<CharSequence> newArrayList();
+
+    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+        .deepCopy(person);
+
+    assertEquals(person, deepCopyPerson);
+    assertNotSame(person, deepCopyPerson);
+  }
+
+  @Test
+  public void testDeepCopyGeneric() {
+    Record record = new Record(Person.SCHEMA$);
+    record.put("name", "John Doe");
+    record.put("age", 42);
+    record.put("siblingnames", Lists.newArrayList());
+
+    Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$)
+        .deepCopy(record);
+
+    assertEquals(record, deepCopyRecord);
+    assertNotSame(record, deepCopyRecord);
+  }
+
+  static class ReflectedPerson {
+    String name;
+    int age;
+    List<String> siblingnames;
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof ReflectedPerson)) {
+        return false;
+      }
+      ReflectedPerson that = (ReflectedPerson) other;
+      return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames);
+    }
+  }
+
+  @Test
+  public void testDeepCopyReflect() {
+    ReflectedPerson person = new ReflectedPerson();
+    person.name = "John Doe";
+    person.age = 42;
+    person.siblingnames = Lists.newArrayList();
+
+    AvroDeepCopier<ReflectedPerson> avroDeepCopier = new AvroDeepCopier.AvroReflectDeepCopier<ReflectedPerson>(
+        ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema());
+    avroDeepCopier.initialize(new Configuration());
+
+    ReflectedPerson deepCopyPerson = avroDeepCopier.deepCopy(person);
+
+    assertEquals(person, deepCopyPerson);
+    assertNotSame(person, deepCopyPerson);
+
+  }
+  
+  @Test
+  public void testDeepCopy_Null() {
+    Person person = null;
+
+    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+        .deepCopy(person);
+
+    assertNull(deepCopyPerson);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
new file mode 100644
index 0000000..db9ebdc
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroGroupedTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Person person = new Person();
+    person.name = "John Doe";
+    person.age = 42;
+    person.siblingnames = Lists.<CharSequence> newArrayList();
+
+    Iterable<Person> inputPersonIterable = Lists.newArrayList(person);
+    Pair<Integer, Iterable<Person>> pair = Pair.of(integerValue, inputPersonIterable);
+
+    PGroupedTableType<Integer, Person> groupedTableType = Avros.tableOf(Avros.ints(),
+        Avros.specifics(Person.class)).getGroupedTableType();
+    groupedTableType.initialize(new Configuration());
+
+    Pair<Integer, Iterable<Person>> detachedPair = groupedTableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    List<Person> personList = Lists.newArrayList(detachedPair.second());
+    assertEquals(inputPersonIterable, personList);
+    assertNotSame(person, personList.get(0));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
new file mode 100644
index 0000000..35d4e5b
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Person person = new Person();
+    person.name = "John Doe";
+    person.age = 42;
+    person.siblingnames = Lists.<CharSequence> newArrayList();
+
+    Pair<Integer, Person> pair = Pair.of(integerValue, person);
+
+    AvroTableType<Integer, Person> tableType = Avros.tableOf(Avros.ints(),
+        Avros.specifics(Person.class));
+    tableType.initialize(new Configuration());
+
+    Pair<Integer, Person> detachedPair = tableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    assertEquals(person, detachedPair.second());
+    assertNotSame(person, detachedPair.second());
+  }
+
+  @Test
+  public void testIsReflect_ContainsReflectKey() {
+    assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).hasReflect());
+  }
+
+  @Test
+  public void testIsReflect_ContainsReflectValue() {
+    assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect());
+  }
+
+  @Test
+  public void testReflect_NoReflectKeyOrValue() {
+    assertFalse(Avros.tableOf(Avros.ints(), Avros.ints()).hasReflect());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
new file mode 100644
index 0000000..a874c63
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.crunch.Pair;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class AvroTypeTest {
+
+  @Test
+  public void testIsSpecific_SpecificData() {
+    assertTrue(Avros.records(Person.class).hasSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_SpecificData() {
+    assertFalse(Avros.records(Person.class).isGeneric());
+  }
+
+  @Test
+  public void testIsSpecific_GenericData() {
+    assertFalse(Avros.generics(Person.SCHEMA$).hasSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_GenericData() {
+    assertTrue(Avros.generics(Person.SCHEMA$).isGeneric());
+  }
+
+  @Test
+  public void testIsSpecific_NonAvroClass() {
+    assertFalse(Avros.ints().hasSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_NonAvroClass() {
+    assertFalse(Avros.ints().isGeneric());
+  }
+
+  @Test
+  public void testIsSpecific_SpecificAvroTable() {
+    assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).hasSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_SpecificAvroTable() {
+    assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isGeneric());
+  }
+
+  @Test
+  public void testIsSpecific_GenericAvroTable() {
+    assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).hasSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_GenericAvroTable() {
+    assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isGeneric());
+  }
+
+  @Test
+  public void testIsReflect_GenericType() {
+    assertFalse(Avros.generics(Person.SCHEMA$).hasReflect());
+  }
+
+  @Test
+  public void testIsReflect_SpecificType() {
+    assertFalse(Avros.records(Person.class).hasReflect());
+  }
+
+  @Test
+  public void testIsReflect_ReflectSimpleType() {
+    assertTrue(Avros.reflects(StringWrapper.class).hasReflect());
+  }
+
+  @Test
+  public void testIsReflect_NonReflectSubType() {
+    assertFalse(Avros.pairs(Avros.ints(), Avros.ints()).hasReflect());
+  }
+
+  @Test
+  public void testIsReflect_ReflectSubType() {
+    assertTrue(Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect());
+  }
+
+  @Test
+  public void testIsReflect_TableOfNonReflectTypes() {
+    assertFalse(Avros.tableOf(Avros.ints(), Avros.strings()).hasReflect());
+  }
+
+  @Test
+  public void testIsReflect_TableWithReflectKey() {
+    assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).hasReflect());
+  }
+
+  @Test
+  public void testIsReflect_TableWithReflectValue() {
+    assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect());
+  }
+
+  @Test
+  public void testReflect_CollectionContainingReflectValue() {
+    assertTrue(Avros.collections(Avros.reflects(StringWrapper.class)).hasReflect());
+  }
+
+  @Test
+  public void testReflect_CollectionNotContainingReflectValue() {
+    assertFalse(Avros.collections(Avros.generics(Person.SCHEMA$)).hasReflect());
+  }
+
+  @Test
+  public void testGetDetachedValue_AlreadyMappedAvroType() {
+    Integer value = 42;
+    AvroType<Integer> intType = Avros.ints();
+    intType.initialize(new Configuration());
+    Integer detachedValue = intType.getDetachedValue(value);
+    assertSame(value, detachedValue);
+  }
+
+  @Test
+  public void testGetDetachedValue_GenericAvroType() {
+    AvroType<Record> genericType = Avros.generics(Person.SCHEMA$);
+    genericType.initialize(new Configuration());
+    GenericData.Record record = new GenericData.Record(Person.SCHEMA$);
+    record.put("name", "name value");
+    record.put("age", 42);
+    record.put("siblingnames", Lists.newArrayList());
+
+    Record detachedRecord = genericType.getDetachedValue(record);
+    assertEquals(record, detachedRecord);
+    assertNotSame(record, detachedRecord);
+  }
+
+  private Person createPerson() {
+    Person person = new Person();
+    person.name = "name value";
+    person.age = 42;
+    person.siblingnames = Lists.<CharSequence> newArrayList();
+    return person;
+  }
+
+  @Test
+  public void testGetDetachedValue_SpecificAvroType() {
+    AvroType<Person> specificType = Avros.specifics(Person.class);
+    specificType.initialize(new Configuration());
+    Person person = createPerson();
+    Person detachedPerson = specificType.getDetachedValue(person);
+    assertEquals(person, detachedPerson);
+    assertNotSame(person, detachedPerson);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testGetDetachedValue_NotInitialized() {
+    AvroType<Person> specificType = Avros.specifics(Person.class);
+    Person person = createPerson();
+    specificType.getDetachedValue(person);
+  }
+
+  static class ReflectedPerson {
+    String name;
+    int age;
+    List<String> siblingnames;
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof ReflectedPerson)) {
+        return false;
+      }
+      ReflectedPerson that = (ReflectedPerson) other;
+      return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames);
+    }
+  }
+
+  @Test
+  public void testGetDetachedValue_ReflectAvroType() {
+    AvroType<ReflectedPerson> reflectType = Avros.reflects(ReflectedPerson.class);
+    reflectType.initialize(new Configuration());
+    ReflectedPerson rp = new ReflectedPerson();
+    rp.name = "josh";
+    rp.age = 32;
+    rp.siblingnames = Lists.newArrayList();
+    ReflectedPerson detached = reflectType.getDetachedValue(rp);
+    assertEquals(rp, detached);
+    assertNotSame(rp, detached);
+  }
+
+  @Test
+  public void testGetDetachedValue_Pair() {
+    Person person = createPerson();
+    AvroType<Pair<Integer, Person>> pairType = Avros.pairs(Avros.ints(),
+        Avros.records(Person.class));
+    pairType.initialize(new Configuration());
+
+    Pair<Integer, Person> inputPair = Pair.of(1, person);
+    Pair<Integer, Person> detachedPair = pairType.getDetachedValue(inputPair);
+
+    assertEquals(inputPair, detachedPair);
+    assertNotSame(inputPair.second(), detachedPair.second());
+  }
+
+  @Test
+  public void testGetDetachedValue_Collection() {
+    Person person = createPerson();
+    List<Person> personList = Lists.newArrayList(person);
+
+    AvroType<Collection<Person>> collectionType = Avros.collections(Avros.records(Person.class));
+    collectionType.initialize(new Configuration());
+
+    Collection<Person> detachedCollection = collectionType.getDetachedValue(personList);
+
+    assertEquals(personList, detachedCollection);
+    Person detachedPerson = detachedCollection.iterator().next();
+
+    assertNotSame(person, detachedPerson);
+  }
+
+  @Test
+  public void testGetDetachedValue_Map() {
+    String key = "key";
+    Person value = createPerson();
+
+    Map<String, Person> stringPersonMap = Maps.newHashMap();
+    stringPersonMap.put(key, value);
+
+    AvroType<Map<String, Person>> mapType = Avros.maps(Avros.records(Person.class));
+    mapType.initialize(new Configuration());
+
+    Map<String, Person> detachedMap = mapType.getDetachedValue(stringPersonMap);
+
+    assertEquals(stringPersonMap, detachedMap);
+    assertNotSame(value, detachedMap.get(key));
+  }
+
+  @Test
+  public void testGetDetachedValue_TupleN() {
+    Person person = createPerson();
+    AvroType<TupleN> ptype = Avros.tuples(Avros.records(Person.class));
+    ptype.initialize(new Configuration());
+    TupleN tuple = new TupleN(person);
+    TupleN detachedTuple = ptype.getDetachedValue(tuple);
+
+    assertEquals(tuple, detachedTuple);
+    assertNotSame(person, detachedTuple.get(0));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
new file mode 100644
index 0000000..5622a56
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.util.Utf8;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * TODO test Avros.register and Avros.containers
+ */
+public class AvrosTest {
+
+  @Test
+  public void testNulls() throws Exception {
+    Void n = null;
+    testInputOutputFn(Avros.nulls(), n, n);
+  }
+
+  @Test
+  public void testStrings() throws Exception {
+    String s = "abc";
+    Utf8 w = new Utf8(s);
+    testInputOutputFn(Avros.strings(), s, w);
+  }
+
+  @Test
+  public void testInts() throws Exception {
+    int j = 55;
+    testInputOutputFn(Avros.ints(), j, j);
+  }
+
+  @Test
+  public void testLongs() throws Exception {
+    long j = Long.MAX_VALUE;
+    testInputOutputFn(Avros.longs(), j, j);
+  }
+
+  @Test
+  public void testFloats() throws Exception {
+    float j = Float.MIN_VALUE;
+    testInputOutputFn(Avros.floats(), j, j);
+  }
+
+  @Test
+  public void testDoubles() throws Exception {
+    double j = Double.MIN_VALUE;
+    testInputOutputFn(Avros.doubles(), j, j);
+  }
+
+  @Test
+  public void testBooleans() throws Exception {
+    boolean j = true;
+    testInputOutputFn(Avros.booleans(), j, j);
+  }
+
+  @Test
+  public void testBytes() throws Exception {
+    byte[] bytes = new byte[] { 17, 26, -98 };
+    ByteBuffer bb = ByteBuffer.wrap(bytes);
+    testInputOutputFn(Avros.bytes(), bb, bb);
+  }
+
+  @Test
+  public void testCollections() throws Exception {
+    Collection<String> j = Lists.newArrayList();
+    j.add("a");
+    j.add("b");
+    Schema collectionSchema = Schema.createArray(Schema.createUnion(ImmutableList.of(Avros.strings().getSchema(),
+        Schema.create(Type.NULL))));
+    GenericData.Array<Utf8> w = new GenericData.Array<Utf8>(2, collectionSchema);
+    w.add(new Utf8("a"));
+    w.add(new Utf8("b"));
+    testInputOutputFn(Avros.collections(Avros.strings()), j, w);
+  }
+
+  @Test
+  public void testNestedTables() throws Exception {
+    PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs());
+    String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString();
+    assertNotNull(schema);
+  }
+
+  @Test
+  public void testPairs() throws Exception {
+    AvroType<Pair<String, String>> at = Avros.pairs(Avros.strings(), Avros.strings());
+    Pair<String, String> j = Pair.of("a", "b");
+    GenericData.Record w = new GenericData.Record(at.getSchema());
+    w.put(0, new Utf8("a"));
+    w.put(1, new Utf8("b"));
+    testInputOutputFn(at, j, w);
+  }
+
+  @Test
+  public void testPairEquals() throws Exception {
+    AvroType<Pair<Long, ByteBuffer>> at1 = Avros.pairs(Avros.longs(), Avros.bytes());
+    AvroType<Pair<Long, ByteBuffer>> at2 = Avros.pairs(Avros.longs(), Avros.bytes());
+    assertEquals(at1, at2);
+    assertEquals(at1.hashCode(), at2.hashCode());
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testTriples() throws Exception {
+    AvroType at = Avros.triples(Avros.strings(), Avros.strings(), Avros.strings());
+    Tuple3 j = Tuple3.of("a", "b", "c");
+    GenericData.Record w = new GenericData.Record(at.getSchema());
+    w.put(0, new Utf8("a"));
+    w.put(1, new Utf8("b"));
+    w.put(2, new Utf8("c"));
+    testInputOutputFn(at, j, w);
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testQuads() throws Exception {
+    AvroType at = Avros.quads(Avros.strings(), Avros.strings(), Avros.strings(), Avros.strings());
+    Tuple4 j = Tuple4.of("a", "b", "c", "d");
+    GenericData.Record w = new GenericData.Record(at.getSchema());
+    w.put(0, new Utf8("a"));
+    w.put(1, new Utf8("b"));
+    w.put(2, new Utf8("c"));
+    w.put(3, new Utf8("d"));
+    testInputOutputFn(at, j, w);
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testTupleN() throws Exception {
+    AvroType at = Avros.tuples(Avros.strings(), Avros.strings(), Avros.strings(), Avros.strings(), Avros.strings());
+    TupleN j = new TupleN("a", "b", "c", "d", "e");
+    GenericData.Record w = new GenericData.Record(at.getSchema());
+    w.put(0, new Utf8("a"));
+    w.put(1, new Utf8("b"));
+    w.put(2, new Utf8("c"));
+    w.put(3, new Utf8("d"));
+    w.put(4, new Utf8("e"));
+    testInputOutputFn(at, j, w);
+
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testWritables() throws Exception {
+    AvroType at = Avros.writables(LongWritable.class);
+    
+    TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(new Configuration());
+    at.getInputMapFn().setContext(testContext);
+    at.getInputMapFn().initialize();
+    at.getOutputMapFn().setContext(testContext);
+    at.getOutputMapFn().initialize();
+    
+    LongWritable lw = new LongWritable(1729L);
+    assertEquals(lw, at.getInputMapFn().map(at.getOutputMapFn().map(lw)));
+  }
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testTableOf() throws Exception {
+    AvroType at = Avros.tableOf(Avros.strings(), Avros.strings());
+    Pair<String, String> j = Pair.of("a", "b");
+    org.apache.avro.mapred.Pair w = new org.apache.avro.mapred.Pair(at.getSchema());
+    w.put(0, new Utf8("a"));
+    w.put(1, new Utf8("b"));
+    // TODO update this after resolving the o.a.a.m.Pair.equals issue
+    initialize(at);
+    assertEquals(j, at.getInputMapFn().map(w));
+    org.apache.avro.mapred.Pair converted = (org.apache.avro.mapred.Pair) at.getOutputMapFn().map(j);
+    assertEquals(w.key(), converted.key());
+    assertEquals(w.value(), converted.value());
+  }
+
+  private static void initialize(PType ptype) {
+    ptype.getInputMapFn().initialize();
+    ptype.getOutputMapFn().initialize();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  protected static void testInputOutputFn(PType ptype, Object java, Object avro) {
+    initialize(ptype);
+    assertEquals(java, ptype.getInputMapFn().map(avro));
+    assertEquals(avro, ptype.getOutputMapFn().map(java));
+  }
+
+  @Test
+  public void testIsPrimitive_PrimitiveMappedType() {
+    assertTrue(Avros.isPrimitive(Avros.ints()));
+  }
+
+  @Test
+  public void testIsPrimitive_TruePrimitiveValue() {
+    AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT), new DeepCopier.NoOpDeepCopier());
+    assertTrue(Avros.isPrimitive(truePrimitiveAvroType));
+  }
+
+  @Test
+  public void testIsPrimitive_False() {
+    assertFalse(Avros.isPrimitive(Avros.reflects(Person.class)));
+  }
+
+  @Test
+  public void testPairs_Generic() {
+    Schema schema = ReflectData.get().getSchema(IntWritable.class);
+
+    GenericData.Record recordA = new GenericData.Record(schema);
+    GenericData.Record recordB = new GenericData.Record(schema);
+
+    AvroType<Pair<Record, Record>> pairType = Avros.pairs(Avros.generics(schema), Avros.generics(schema));
+    Pair<Record, Record> pair = Pair.of(recordA, recordB);
+    pairType.getOutputMapFn().initialize();
+    pairType.getInputMapFn().initialize();
+    Object mapped = pairType.getOutputMapFn().map(pair);
+    Pair<Record, Record> doubleMappedPair = pairType.getInputMapFn().map(mapped);
+
+    assertEquals(pair, doubleMappedPair);
+    mapped.hashCode();
+  }
+
+  @Test
+  public void testPairs_Reflect() {
+    IntWritable intWritableA = new IntWritable(1);
+    IntWritable intWritableB = new IntWritable(2);
+
+    AvroType<Pair<IntWritable, IntWritable>> pairType = Avros.pairs(Avros.reflects(IntWritable.class),
+        Avros.reflects(IntWritable.class));
+    Pair<IntWritable, IntWritable> pair = Pair.of(intWritableA, intWritableB);
+    pairType.getOutputMapFn().initialize();
+    pairType.getInputMapFn().initialize();
+    Object mapped = pairType.getOutputMapFn().map(pair);
+
+    Pair<IntWritable, IntWritable> doubleMappedPair = pairType.getInputMapFn().map(mapped);
+
+    assertEquals(pair, doubleMappedPair);
+  }
+
+  @Test
+  public void testPairs_Specific() {
+    Person personA = new Person();
+    Person personB = new Person();
+
+    personA.age = 1;
+    personA.name = "A";
+    personA.siblingnames = Collections.<CharSequence> emptyList();
+
+    personB.age = 2;
+    personB.name = "B";
+    personB.siblingnames = Collections.<CharSequence> emptyList();
+
+    AvroType<Pair<Person, Person>> pairType = Avros.pairs(Avros.records(Person.class), Avros.records(Person.class));
+
+    Pair<Person, Person> pair = Pair.of(personA, personB);
+    pairType.getOutputMapFn().initialize();
+    pairType.getInputMapFn().initialize();
+
+    Object mapped = pairType.getOutputMapFn().map(pair);
+    Pair<Person, Person> doubleMappedPair = pairType.getInputMapFn().map(mapped);
+
+    assertEquals(pair, doubleMappedPair);
+
+  }
+
+  @Test
+  public void testPairOutputMapFn_VerifyNoObjectReuse() {
+    StringWrapper stringWrapper = new StringWrapper("Test");
+
+    Pair<Integer, StringWrapper> pair = Pair.of(1, stringWrapper);
+
+    AvroType<Pair<Integer, StringWrapper>> pairType = Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class));
+
+    pairType.getOutputMapFn().initialize();
+
+    Object outputMappedValueA = pairType.getOutputMapFn().map(pair);
+    Object outputMappedValueB = pairType.getOutputMapFn().map(pair);
+
+    assertEquals(outputMappedValueA, outputMappedValueB);
+    assertNotSame(outputMappedValueA, outputMappedValueB);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
new file mode 100644
index 0000000..c807a90
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+
+import org.apache.crunch.test.Tests;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+
+public class GenericArrayWritableTest {
+
+  @Test
+  public void testEmpty() {
+    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+    src.set(new Text[0]);
+
+    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+    assertThat(dest.get().length, is(0));
+  }
+
+  @Test
+  public void testNonEmpty() {
+    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+    src.set(new Text[] { new Text("foo"), new Text("bar") });
+
+    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+    assertThat(src.get(), not(sameInstance(dest.get())));
+    assertThat(dest.get().length, is(2));
+    assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("foo"), new Text("bar")));
+  }
+
+  @Test
+  public void testNulls() {
+    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
+    src.set(new Text[] { new Text("a"), null, new Text("b") });
+
+    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+
+    assertThat(src.get(), not(sameInstance(dest.get())));
+    assertThat(dest.get().length, is(3));
+    assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("a"), new Text("b"), null));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
new file mode 100644
index 0000000..c49491b
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WritableDeepCopierTest {
+
+  private WritableDeepCopier<Text> deepCopier;
+
+  @Before
+  public void setUp() {
+    deepCopier = new WritableDeepCopier<Text>(Text.class);
+  }
+
+  @Test
+  public void testDeepCopy() {
+    Text text = new Text("value");
+    Text deepCopy = deepCopier.deepCopy(text);
+
+    assertEquals(text, deepCopy);
+    assertNotSame(text, deepCopy);
+  }
+  
+  @Test
+  public void testDeepCopy_Null() {
+    Text text = null;
+    Text deepCopy = deepCopier.deepCopy(text);
+    
+    assertNull(deepCopy);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
new file mode 100644
index 0000000..f6c201b
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class WritableGroupedTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Text textValue = new Text("forty-two");
+    Iterable<Text> inputTextIterable = Lists.newArrayList(textValue);
+    Pair<Integer, Iterable<Text>> pair = Pair.of(integerValue, inputTextIterable);
+
+    PGroupedTableType<Integer, Text> groupedTableType = Writables.tableOf(Writables.ints(),
+        Writables.writables(Text.class)).getGroupedTableType();
+    groupedTableType.initialize(new Configuration());
+
+    Pair<Integer, Iterable<Text>> detachedPair = groupedTableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    List<Text> textList = Lists.newArrayList(detachedPair.second());
+    assertEquals(inputTextIterable, textList);
+    assertNotSame(textValue, textList.get(0));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
new file mode 100644
index 0000000..697a28c
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class WritableTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Text textValue = new Text("forty-two");
+    Pair<Integer, Text> pair = Pair.of(integerValue, textValue);
+
+    WritableTableType<Integer, Text> tableType = Writables.tableOf(Writables.ints(),
+        Writables.writables(Text.class));
+    tableType.initialize(new Configuration());
+    Pair<Integer, Text> detachedPair = tableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    assertEquals(textValue, detachedPair.second());
+    assertNotSame(textValue, detachedPair.second());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
new file mode 100644
index 0000000..65e946b
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class WritableTypeTest {
+
+  @Test(expected = IllegalStateException.class)
+  public void testGetDetachedValue_NotInitialized() {
+    WritableType<Text, Text> textWritableType = Writables.writables(Text.class);
+    Text value = new Text("test");
+
+    // Calling getDetachedValue without first calling initialize should throw an
+    // exception
+    textWritableType.getDetachedValue(value);
+  }
+
+  @Test
+  public void testGetDetachedValue_CustomWritable() {
+    WritableType<Text, Text> textWritableType = Writables.writables(Text.class);
+    textWritableType.initialize(new Configuration());
+    Text value = new Text("test");
+
+    Text detachedValue = textWritableType.getDetachedValue(value);
+    assertEquals(value, detachedValue);
+    assertNotSame(value, detachedValue);
+  }
+
+  @Test
+  public void testGetDetachedValue_Collection() {
+    Collection<Text> textCollection = Lists.newArrayList(new Text("value"));
+    WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables
+        .collections(Writables.writables(Text.class));
+    ptype.initialize(new Configuration());
+
+    Collection<Text> detachedCollection = ptype.getDetachedValue(textCollection);
+    assertEquals(textCollection, detachedCollection);
+    assertNotSame(textCollection.iterator().next(), detachedCollection.iterator().next());
+  }
+
+  @Test
+  public void testGetDetachedValue_Tuple() {
+    Pair<Text, Text> textPair = Pair.of(new Text("one"), new Text("two"));
+    WritableType<Pair<Text, Text>, TupleWritable> ptype = Writables.pairs(
+        Writables.writables(Text.class), Writables.writables(Text.class));
+    ptype.initialize(new Configuration());
+
+    Pair<Text, Text> detachedPair = ptype.getDetachedValue(textPair);
+    assertEquals(textPair, detachedPair);
+    assertNotSame(textPair.first(), detachedPair.first());
+    assertNotSame(textPair.second(), detachedPair.second());
+  }
+
+  @Test
+  public void testGetDetachedValue_Map() {
+    Map<String, Text> stringTextMap = Maps.newHashMap();
+    stringTextMap.put("key", new Text("value"));
+
+    WritableType<Map<String, Text>, MapWritable> ptype = Writables.maps(Writables
+        .writables(Text.class));
+    ptype.initialize(new Configuration());
+    Map<String, Text> detachedMap = ptype.getDetachedValue(stringTextMap);
+
+    assertEquals(stringTextMap, detachedMap);
+    assertNotSame(stringTextMap.get("key"), detachedMap.get("key"));
+  }
+
+}