You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/08/09 22:59:55 UTC

git commit: Correct Avro deep copying on Pair and Collection.

Updated Branches:
  refs/heads/master 547d56fa8 -> 153617a44


Correct Avro deep copying on Pair and Collection.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/153617a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/153617a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/153617a4

Branch: refs/heads/master
Commit: 153617a4444e439b6fe9dc06d3d4c5a3859307da
Parents: 547d56f
Author: Gabriel Reid <ga...@gmail.com>
Authored: Thu Aug 9 22:53:55 2012 +0200
Committer: Gabriel Reid <ga...@gmail.com>
Committed: Thu Aug 9 22:53:55 2012 +0200

----------------------------------------------------------------------
 .../apache/crunch/types/CollectionDeepCopier.java  |   49 ++++++++++
 .../java/org/apache/crunch/types/DeepCopier.java   |   37 ++++++++
 .../org/apache/crunch/types/TupleDeepCopier.java   |   52 +++++++++++
 .../java/org/apache/crunch/types/TupleFactory.java |   23 +++++
 .../apache/crunch/types/avro/AvroDeepCopier.java   |   13 ++-
 .../org/apache/crunch/types/avro/AvroType.java     |   24 ++++--
 .../java/org/apache/crunch/types/avro/Avros.java   |   64 ++++++++------
 .../crunch/types/CollectionDeepCopierTest.java     |   31 +++++++
 .../apache/crunch/types/TupleDeepCopierTest.java   |   49 ++++++++++
 .../org/apache/crunch/types/TupleFactoryTest.java  |   69 +++++++++++++++
 .../crunch/types/avro/AvroDeepCopierTest.java      |   10 ++-
 .../org/apache/crunch/types/avro/AvroTypeTest.java |   50 +++++++++--
 12 files changed, 421 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
new file mode 100644
index 0000000..5bdc715
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.thirdparty.guava.common.collect.Lists;
+
+/**
+ * Performs deep copies (based on underlying PType deep copying) of Collections.
+ * 
+ * @param <T>
+ *          The type of Tuple implementation being copied
+ */
+public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>>  {
+
+  
+  private PType<T> elementType;
+
+  public CollectionDeepCopier(PType<T> elementType) {
+    this.elementType = elementType;
+  }
+
+  @Override
+  public Collection<T> deepCopy(Collection<T> source) {
+    List<T> copiedCollection = Lists.newArrayListWithCapacity(source.size());
+    for (T value : source){
+      copiedCollection.add(elementType.getDetachedValue(value));
+    }
+    return copiedCollection;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
new file mode 100644
index 0000000..539b808
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+/**
+ * Performs deep copies of values.
+ * 
+ * @param <T>
+ *          The type of value that will be copied
+ */
+public interface DeepCopier<T> {
+
+  /**
+   * Create a deep copy of a value.
+   * 
+   * @param source
+   *          The value to be copied
+   * @return The deep copy of the value
+   */
+  T deepCopy(T source);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
new file mode 100644
index 0000000..094b582
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.List;
+
+import org.apache.crunch.Tuple;
+
+/**
+ * Performs deep copies (based on underlying PType deep copying) of Tuple-based
+ * objects.
+ * 
+ * @param <T>
+ *          The type of Tuple implementation being copied
+ */
+public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> {
+
+  private final TupleFactory<T> tupleFactory;
+  private final List<PType> elementTypes;
+
+  public TupleDeepCopier(PType<T> ptype) {
+    tupleFactory = TupleFactory.getTupleFactory(ptype.getTypeClass());
+    elementTypes = ptype.getSubTypes();
+  }
+
+  @Override
+  public T deepCopy(T source) {
+    Object[] deepCopyValues = new Object[source.size()];
+
+    for (int valueIndex = 0; valueIndex < elementTypes.size(); valueIndex++) {
+      PType elementType = elementTypes.get(valueIndex);
+      deepCopyValues[valueIndex] = elementType.getDetachedValue(source.get(valueIndex));
+    }
+
+    return tupleFactory.makeTuple(deepCopyValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
index 16c3bcd..c547cd6 100644
--- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
@@ -34,6 +34,28 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable {
 
   public abstract T makeTuple(Object... values);
 
+  /**
+   * Get the {@link TupleFactory} for a given Tuple implementation. Only
+   * standard Tuple implementations are supported.
+   * 
+   * @param tupleClass
+   *          The class for which the factory is to be retrieved
+   * @return The appropriate TupleFactory
+   */
+  public static <T extends Tuple> TupleFactory<T> getTupleFactory(Class<T> tupleClass) {
+    if (tupleClass == Pair.class) {
+      return (TupleFactory<T>) PAIR;
+    } else if (tupleClass == Tuple3.class) {
+      return (TupleFactory<T>) TUPLE3;
+    } else if (tupleClass == Tuple4.class) {
+      return (TupleFactory<T>) TUPLE4;
+    } else if (tupleClass == TupleN.class) {
+      return (TupleFactory<T>) TUPLEN;
+    } else {
+      throw new IllegalArgumentException("Can't create TupleFactory for " + tupleClass);
+    }
+  }
+
   public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() {
     @Override
     public Pair makeTuple(Object... values) {
@@ -96,4 +118,5 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable {
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 078353a..ad5ba04 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -36,6 +36,7 @@ import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.DeepCopier;
 
 /**
  * Performs deep copies of Avro-serializable objects.
@@ -45,7 +46,7 @@ import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
  * running in its own JVM, but it may well be a problem in any other kind of
  * multi-threaded context.
  */
-public abstract class AvroDeepCopier<T> implements Serializable {
+public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
 
   private BinaryEncoder binaryEncoder;
   private BinaryDecoder binaryDecoder;
@@ -67,7 +68,7 @@ public abstract class AvroDeepCopier<T> implements Serializable {
     private Class<T> valueClass;
 
     public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
-      super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader(schema));
+      super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader<T>(schema));
       this.valueClass = valueClass;
     }
 
@@ -113,6 +114,10 @@ public abstract class AvroDeepCopier<T> implements Serializable {
     }
   }
 
+  public static class AvroTupleDeepCopier {
+
+  }
+
   /**
    * Create a deep copy of an Avro value.
    * 
@@ -120,6 +125,7 @@ public abstract class AvroDeepCopier<T> implements Serializable {
    *          The value to be copied
    * @return The deep copy of the value
    */
+  @Override
   public T deepCopy(T source) {
     ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
     binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
@@ -127,7 +133,8 @@ public abstract class AvroDeepCopier<T> implements Serializable {
     try {
       datumWriter.write(source, binaryEncoder);
       binaryEncoder.flush();
-      binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+      binaryDecoder = DecoderFactory.get()
+          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
       datumReader.read(target, binaryDecoder);
     } catch (Exception e) {
       throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index b3ce576..8d7fbd5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.types.avro;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.avro.Schema;
@@ -25,11 +26,15 @@ import org.apache.avro.specific.SpecificRecord;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Tuple;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.io.avro.AvroFileSourceTarget;
+import org.apache.crunch.types.CollectionDeepCopier;
 import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Preconditions;
@@ -50,13 +55,14 @@ public class AvroType<T> implements PType<T> {
   private final MapFn baseInputMapFn;
   private final MapFn baseOutputMapFn;
   private final List<PType> subTypes;
-  private AvroDeepCopier<T> deepCopier;
+  private DeepCopier<T> deepCopier;
 
   public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
     this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), ptypes);
   }
 
-  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, PType... ptypes) {
+  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
+      PType... ptypes) {
     this.typeClass = typeClass;
     this.schema = Preconditions.checkNotNull(schema);
     this.schemaString = schema.toString();
@@ -125,7 +131,8 @@ public class AvroType<T> implements PType<T> {
       return false;
     }
 
-    return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class.isAssignableFrom(typeClass));
+    return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class
+        .isAssignableFrom(typeClass));
   }
 
   public MapFn<Object, T> getInputMapFn() {
@@ -146,9 +153,13 @@ public class AvroType<T> implements PType<T> {
     return new AvroFileSourceTarget<T>(path, this);
   }
 
-  private AvroDeepCopier<T> getDeepCopier() {
+  private DeepCopier<T> getDeepCopier() {
     if (deepCopier == null) {
-      if (isSpecific()) {
+      if (Tuple.class.isAssignableFrom(this.typeClass)) {
+        deepCopier = new TupleDeepCopier(this);
+      } else if (Collection.class.isAssignableFrom(this.typeClass)){
+        deepCopier = new CollectionDeepCopier(this.subTypes.get(0));
+      } else if (isSpecific()) {
         deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema());
       } else if (isGeneric()) {
         deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema());
@@ -160,7 +171,8 @@ public class AvroType<T> implements PType<T> {
   }
 
   public T getDetachedValue(T value) {
-    if (this.baseInputMapFn instanceof IdentityFn && !Avros.isPrimitive(this)) {
+
+    if (!Avros.isPrimitive(this)) {
       return getDeepCopier().deepCopy(value);
     }
     return value;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index f8214a1..a6d7169 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -77,7 +77,8 @@ public class Avros {
   public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
 
   public static void configureReflectDataFactory(Configuration conf) {
-    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
+    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(),
+        ReflectDataFactory.class);
   }
 
   public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
@@ -109,8 +110,8 @@ public class Avros {
     }
   };
 
-  private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
-      UTF8_TO_STRING, STRING_TO_UTF8);
+  private static final AvroType<String> strings = new AvroType<String>(String.class,
+      Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8);
   private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
   private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
   private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
@@ -120,9 +121,10 @@ public class Avros {
   private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
       Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance());
 
-  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
-      .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
-      .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
+      .<Class<?>, PType<?>> builder().put(String.class, strings).put(Long.class, longs)
+      .put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles)
+      .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
 
   private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
 
@@ -206,8 +208,8 @@ public class Avros {
     public T map(ByteBuffer input) {
       T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
       try {
-        instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(), input
-            .limit())));
+        instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input
+            .arrayOffset(), input.limit())));
       } catch (IOException e) {
         LOG.error("Exception thrown reading instance of: " + writableClazz, e);
       }
@@ -232,8 +234,8 @@ public class Avros {
   }
 
   public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
-    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
-        new WritableToBytesMapFn<T>());
+    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(
+        clazz), new WritableToBytesMapFn<T>());
   }
 
   private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
@@ -277,7 +279,8 @@ public class Avros {
     }
   }
 
-  private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
+  private static class CollectionToGenericDataArray extends
+      MapFn<Collection<?>, GenericData.Array<?>> {
 
     private final MapFn mapFn;
     private final String jsonSchema;
@@ -319,8 +322,10 @@ public class Avros {
   public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
     AvroType<T> avroType = (AvroType<T>) ptype;
     Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
-    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
-    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
+    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
+        avroType.getInputMapFn());
+    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema,
+        avroType.getOutputMapFn());
     return new AvroType(Collection.class, collectionSchema, input, output, ptype);
   }
 
@@ -520,23 +525,25 @@ public class Avros {
     return new AvroType(Pair.class, schema, input, output, p1, p2);
   }
 
-  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
+  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
+      PType<V3> p3) {
     Schema schema = createTupleSchema(p1, p2, p3);
-    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
-        new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
+    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2,
+        p3), new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
   }
 
-  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
-      PType<V4> p4) {
+  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
     Schema schema = createTupleSchema(p1, p2, p3, p4);
-    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
-        new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
+    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2,
+        p3, p4), new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
   }
 
   public static final AvroType<TupleN> tuples(PType... ptypes) {
     Schema schema = createTupleSchema(ptypes);
-    return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
-        new TupleToGenericRecord(schema, ptypes), ptypes);
+    return new AvroType(TupleN.class, schema,
+        new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
+            ptypes), ptypes);
   }
 
   public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
@@ -546,8 +553,8 @@ public class Avros {
       typeArgs[i] = ptypes[i].getTypeClass();
     }
     TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
-        ptypes), ptypes);
+    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes),
+        new TupleToGenericRecord(schema, ptypes), ptypes);
   }
 
   private static Schema createTupleSchema(PType<?>... ptypes) {
@@ -564,11 +571,12 @@ public class Avros {
     return schema;
   }
 
-  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
-      PType<S> base) {
+  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
+      MapFn<T, S> outputFn, PType<S> base) {
     AvroType<S> abase = (AvroType<S>) base;
-    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
-        new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(new PType[0]));
+    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(),
+        inputFn), new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(
+        new PType[0]));
   }
 
   public static <T> PType<T> jsons(Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
new file mode 100644
index 0000000..7e532e8
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
@@ -0,0 +1,31 @@
+package org.apache.crunch.types;
+
+import static org.junit.Assert.*;
+
+import java.util.Collection;
+
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CollectionDeepCopierTest {
+
+  @Test
+  public void testDeepCopy() {
+    Person person = new Person();
+    person.setAge(42);
+    person.setName("John Smith");
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Collection<Person> personCollection = Lists.newArrayList(person);
+    CollectionDeepCopier<Person> collectionDeepCopier = new CollectionDeepCopier<Person>(Avros.records(Person.class));
+
+    Collection<Person> deepCopyCollection = collectionDeepCopier.deepCopy(personCollection);
+
+    assertEquals(personCollection, deepCopyCollection);
+    assertNotSame(personCollection.iterator().next(), deepCopyCollection.iterator().next());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
new file mode 100644
index 0000000..8a3a12f
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TupleDeepCopierTest {
+
+  @Test
+  public void testDeepCopy_Pair() {
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Pair<Integer, Person> inputPair = Pair.of(1, person);
+    DeepCopier<Pair<Integer, Person>> deepCopier = new TupleDeepCopier<Pair<Integer, Person>>(
+        Avros.pairs(Avros.ints(), Avros.records(Person.class)));
+
+    Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair);
+
+    assertEquals(inputPair, deepCopyPair);
+    assertNotSame(inputPair.second(), deepCopyPair.second());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
new file mode 100644
index 0000000..25b0371
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.junit.Test;
+
+public class TupleFactoryTest {
+
+  @Test
+  public void testGetTupleFactory_Pair() {
+    assertEquals(TupleFactory.PAIR, TupleFactory.getTupleFactory(Pair.class));
+  }
+
+  @Test
+  public void testGetTupleFactory_Tuple3() {
+    assertEquals(TupleFactory.TUPLE3, TupleFactory.getTupleFactory(Tuple3.class));
+  }
+
+  @Test
+  public void testGetTupleFactory_Tuple4() {
+    assertEquals(TupleFactory.TUPLE4, TupleFactory.getTupleFactory(Tuple4.class));
+  }
+
+  @Test
+  public void testGetTupleFactory_TupleN() {
+    assertEquals(TupleFactory.TUPLEN, TupleFactory.getTupleFactory(TupleN.class));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetTupleFactory_CustomTupleClass() {
+    TupleFactory.getTupleFactory(CustomTupleImplementation.class);
+  }
+
+  private static class CustomTupleImplementation implements Tuple {
+
+    @Override
+    public Object get(int index) {
+      return null;
+    }
+
+    @Override
+    public int size() {
+      return 0;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index d966dfe..fa1b4c4 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -36,7 +36,8 @@ public class AvroDeepCopierTest {
     person.setAge(42);
     person.setSiblingnames(Lists.<CharSequence> newArrayList());
 
-    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$).deepCopy(person);
+    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+        .deepCopy(person);
 
     assertEquals(person, deepCopyPerson);
     assertNotSame(person, deepCopyPerson);
@@ -49,7 +50,8 @@ public class AvroDeepCopierTest {
     record.put("age", 42);
     record.put("siblingnames", Lists.newArrayList());
 
-    Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(record);
+    Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$)
+        .deepCopy(record);
 
     assertEquals(record, deepCopyRecord);
     assertNotSame(record, deepCopyRecord);
@@ -62,8 +64,8 @@ public class AvroDeepCopierTest {
     person.setAge(42);
     person.setSiblingnames(Lists.<CharSequence> newArrayList());
 
-    Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class, Person.SCHEMA$)
-        .deepCopy(person);
+    Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class,
+        Person.SCHEMA$).deepCopy(person);
 
     assertEquals(person, deepCopyPerson);
     assertNotSame(person, deepCopyPerson);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
index 2a80a5e..486bd1a 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -23,8 +23,12 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.crunch.Pair;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.junit.Test;
@@ -152,15 +156,19 @@ public class AvroTypeTest {
     assertEquals(record, detachedRecord);
     assertNotSame(record, detachedRecord);
   }
-
-  @Test
-  public void testGetDetachedValue_SpecificAvroType() {
-    AvroType<Person> specificType = Avros.records(Person.class);
+  
+  private Person createPerson(){
     Person person = new Person();
     person.setName("name value");
     person.setAge(42);
     person.setSiblingnames(Lists.<CharSequence> newArrayList());
+    return person;
+  }
 
+  @Test
+  public void testGetDetachedValue_SpecificAvroType() {
+    AvroType<Person> specificType = Avros.records(Person.class);
+    Person person = createPerson();
     Person detachedPerson = specificType.getDetachedValue(person);
     assertEquals(person, detachedPerson);
     assertNotSame(person, detachedPerson);
@@ -169,14 +177,38 @@ public class AvroTypeTest {
   @Test
   public void testGetDetachedValue_ReflectAvroType() {
     AvroType<Person> reflectType = Avros.reflects(Person.class);
-    Person person = new Person();
-    person.setName("name value");
-    person.setAge(42);
-    person.setSiblingnames(Lists.<CharSequence> newArrayList());
-
+    Person person = createPerson();
     Person detachedPerson = reflectType.getDetachedValue(person);
     assertEquals(person, detachedPerson);
     assertNotSame(person, detachedPerson);
   }
 
+  @Test
+  public void testGetDetachedValue_Pair() {
+    Person person = createPerson();
+    AvroType<Pair<Integer, Person>> pairType = Avros.pairs(Avros.ints(),
+        Avros.records(Person.class));
+
+    Pair<Integer, Person> inputPair = Pair.of(1, person);
+    Pair<Integer, Person> detachedPair = pairType.getDetachedValue(inputPair);
+
+    assertEquals(inputPair, detachedPair);
+    assertNotSame(inputPair.second(), detachedPair.second());
+  }
+  
+  @Test
+  public void testGetDetachedValue_Collection(){
+    Person person = createPerson();
+    List<Person> personList = Lists.newArrayList(person);
+    
+    AvroType<Collection<Person>> collectionType = Avros.collections(Avros.records(Person.class));
+    
+    Collection<Person> detachedCollection = collectionType.getDetachedValue(personList);
+    
+    assertEquals(personList, detachedCollection);
+    Person detachedPerson = detachedCollection.iterator().next();
+    
+    assertNotSame(person, detachedPerson);
+  }
+
 }