You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/08/16 07:09:06 UTC

git commit: CRUNCH-45 - Fix deep copying on Writables

Updated Branches:
  refs/heads/master e487447b0 -> 2932e9e4a


CRUNCH-45 - Fix deep copying on Writables

Correct broken getDetachedValue for Writable tuples, collections,
and maps. Also refactor Writable deepcopying to use MapFns that
are already present.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2932e9e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2932e9e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2932e9e4

Branch: refs/heads/master
Commit: 2932e9e4a053414e961b77e419a8b1d07ee2daa2
Parents: e487447
Author: Gabriel Reid <gr...@apache.org>
Authored: Wed Aug 15 09:30:32 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Thu Aug 16 06:56:37 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/crunch/types/DeepCopier.java   |   13 ++-
 .../org/apache/crunch/types/TupleDeepCopier.java   |    8 +-
 .../apache/crunch/types/avro/AvroDeepCopier.java   |   74 +++++++++--
 .../apache/crunch/types/avro/AvroTableType.java    |    3 +-
 .../org/apache/crunch/types/avro/AvroType.java     |   38 +-----
 .../java/org/apache/crunch/types/avro/Avros.java   |   95 ++++++++-------
 .../crunch/types/writable/WritableDeepCopier.java  |   60 +++++++++
 .../apache/crunch/types/writable/WritableType.java |   16 +--
 .../apache/crunch/types/writable/Writables.java    |   36 +-----
 .../org/apache/crunch/types/PTypeUtilsTest.java    |    2 +-
 .../apache/crunch/types/TupleDeepCopierTest.java   |    4 +-
 .../crunch/types/avro/AvroDeepCopierTest.java      |    1 +
 .../org/apache/crunch/types/avro/AvroTypeTest.java |   14 ++-
 .../org/apache/crunch/types/avro/AvrosTest.java    |    5 +-
 .../types/writable/WritableDeepCopierTest.java     |   46 +++++++
 .../crunch/types/writable/WritableTypeTest.java    |   53 +++++++-
 .../crunch/types/writable/WritablesTest.java       |    7 -
 17 files changed, 316 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
index 539b808..a96e7bf 100644
--- a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
@@ -17,13 +17,15 @@
  */
 package org.apache.crunch.types;
 
+import java.io.Serializable;
+
 /**
  * Performs deep copies of values.
  * 
  * @param <T>
  *          The type of value that will be copied
  */
-public interface DeepCopier<T> {
+public interface DeepCopier<T> extends Serializable {
 
   /**
    * Create a deep copy of a value.
@@ -33,5 +35,14 @@ public interface DeepCopier<T> {
    * @return The deep copy of the value
    */
   T deepCopy(T source);
+  
+  static class NoOpDeepCopier<V> implements DeepCopier<V> {
 
+    @Override
+    public V deepCopy(V source) {
+      return source;
+    }
+    
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
index 094b582..4f473a0 100644
--- a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
@@ -21,6 +21,8 @@ import java.util.List;
 
 import org.apache.crunch.Tuple;
 
+import com.google.common.collect.Lists;
+
 /**
  * Performs deep copies (based on underlying PType deep copying) of Tuple-based
  * objects.
@@ -33,9 +35,9 @@ public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> {
   private final TupleFactory<T> tupleFactory;
   private final List<PType> elementTypes;
 
-  public TupleDeepCopier(PType<T> ptype) {
-    tupleFactory = TupleFactory.getTupleFactory(ptype.getTypeClass());
-    elementTypes = ptype.getSubTypes();
+  public TupleDeepCopier(Class<T> tupleClass, PType...elementTypes) {
+    tupleFactory = TupleFactory.getTupleFactory(tupleClass);
+    this.elementTypes = Lists.newArrayList(elementTypes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index ad5ba04..fe4fe1a 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -48,18 +48,31 @@ import org.apache.crunch.types.DeepCopier;
  */
 public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
 
+  private String jsonSchema;
+  private transient Schema schema;
   private BinaryEncoder binaryEncoder;
   private BinaryDecoder binaryDecoder;
-  protected DatumWriter<T> datumWriter;
-  protected DatumReader<T> datumReader;
 
-  protected AvroDeepCopier(DatumWriter<T> datumWriter, DatumReader<T> datumReader) {
-    this.datumWriter = datumWriter;
-    this.datumReader = datumReader;
+  private transient DatumWriter<T> datumWriter;
+  private transient DatumReader<T> datumReader;
+
+  public AvroDeepCopier(Schema schema) {
+    this.jsonSchema = schema.toString();
+  }
+
+  protected Schema getSchema() {
+    if (schema == null) {
+      schema = new Schema.Parser().parse(jsonSchema);
+    }
+    return schema;
   }
 
   protected abstract T createCopyTarget();
 
+  protected abstract DatumWriter<T> createDatumWriter();
+
+  protected abstract DatumReader<T> createDatumReader();
+
   /**
    * Deep copier for Avro specific data objects.
    */
@@ -68,7 +81,7 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     private Class<T> valueClass;
 
     public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
-      super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader<T>(schema));
+      super(schema);
       this.valueClass = valueClass;
     }
 
@@ -77,6 +90,16 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
       return createNewInstance(valueClass);
     }
 
+    @Override
+    protected DatumWriter<T> createDatumWriter() {
+      return new SpecificDatumWriter<T>(getSchema());
+    }
+
+    @Override
+    protected DatumReader<T> createDatumReader() {
+      return new SpecificDatumReader<T>(getSchema());
+    }
+
   }
 
   /**
@@ -84,16 +107,25 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
    */
   public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
 
-    private Schema schema;
+    private transient Schema schema;
 
     public AvroGenericDeepCopier(Schema schema) {
-      super(new GenericDatumWriter<Record>(schema), new GenericDatumReader<Record>(schema));
-      this.schema = schema;
+      super(schema);
     }
 
     @Override
     protected Record createCopyTarget() {
-      return new GenericData.Record(schema);
+      return new GenericData.Record(getSchema());
+    }
+
+    @Override
+    protected DatumReader<Record> createDatumReader() {
+      return new GenericDatumReader<Record>(getSchema());
+    }
+
+    @Override
+    protected DatumWriter<Record> createDatumWriter() {
+      return new GenericDatumWriter<Record>(getSchema());
     }
   }
 
@@ -101,10 +133,11 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
    * Deep copier for Avro reflect data objects.
    */
   public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
+
     private Class<T> valueClass;
 
     public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
-      super(new ReflectDatumWriter<T>(schema), new ReflectDatumReader<T>(schema));
+      super(schema);
       this.valueClass = valueClass;
     }
 
@@ -112,6 +145,16 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     protected T createCopyTarget() {
       return createNewInstance(valueClass);
     }
+
+    @Override
+    protected DatumReader<T> createDatumReader() {
+      return new ReflectDatumReader<T>(getSchema());
+    }
+
+    @Override
+    protected DatumWriter<T> createDatumWriter() {
+      return new ReflectDatumWriter<T>(getSchema());
+    }
   }
 
   public static class AvroTupleDeepCopier {
@@ -127,14 +170,19 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
    */
   @Override
   public T deepCopy(T source) {
+    if (datumReader == null) {
+      datumReader = createDatumReader();
+    }
+    if (datumWriter == null) {
+      datumWriter = createDatumWriter();
+    }
     ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
     binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
     T target = createCopyTarget();
     try {
       datumWriter.write(source, binaryEncoder);
       binaryEncoder.flush();
-      binaryDecoder = DecoderFactory.get()
-          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+      binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
       datumReader.read(target, binaryDecoder);
     } catch (Exception e) {
       throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index eb26bf1..bd4b14c 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -25,6 +25,7 @@ import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -121,7 +122,7 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
   public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
     super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()),
         new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), new PairToAvroPair(keyType,
-            valueType), keyType, valueType);
+            valueType), new TupleDeepCopier(Pair.class, keyType, valueType), keyType, valueType);
     this.keyType = keyType;
     this.valueType = valueType;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index 82c4c91..6e35f5b 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -17,9 +17,7 @@
  */
 package org.apache.crunch.types.avro;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -27,16 +25,12 @@ import org.apache.avro.specific.SpecificRecord;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.SourceTarget;
-import org.apache.crunch.Tuple;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.io.avro.AvroFileSourceTarget;
-import org.apache.crunch.types.CollectionDeepCopier;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.DeepCopier;
-import org.apache.crunch.types.MapDeepCopier;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Preconditions;
@@ -59,17 +53,18 @@ public class AvroType<T> implements PType<T> {
   private final List<PType> subTypes;
   private DeepCopier<T> deepCopier;
 
-  public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
-    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), ptypes);
+  public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier, PType... ptypes) {
+    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes);
   }
 
-  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
+  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, DeepCopier<T> deepCopier,
       PType... ptypes) {
     this.typeClass = typeClass;
     this.schema = Preconditions.checkNotNull(schema);
     this.schemaString = schema.toString();
     this.baseInputMapFn = inputMapFn;
     this.baseOutputMapFn = outputMapFn;
+    this.deepCopier = deepCopier;
     this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
   }
 
@@ -155,31 +150,8 @@ public class AvroType<T> implements PType<T> {
     return new AvroFileSourceTarget<T>(path, this);
   }
 
-  private DeepCopier<T> getDeepCopier() {
-    if (deepCopier == null) {
-      if (Tuple.class.isAssignableFrom(this.typeClass)) {
-        deepCopier = new TupleDeepCopier(this);
-      } else if (Map.class.isAssignableFrom(this.typeClass)){
-        deepCopier = new MapDeepCopier(this.subTypes.get(0));
-      } else if (Collection.class.isAssignableFrom(this.typeClass)){
-        deepCopier = new CollectionDeepCopier(this.subTypes.get(0));
-      } else if (isSpecific()) {
-        deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema());
-      } else if (isGeneric()) {
-        deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema());
-      } else {
-        deepCopier = new AvroDeepCopier.AvroReflectDeepCopier<T>(typeClass, getSchema());
-      }
-    }
-    return deepCopier;
-  }
-
   public T getDetachedValue(T value) {
-
-    if (!Avros.isPrimitive(this)) {
-      return getDeepCopier().deepCopy(value);
-    }
-    return value;
+    return deepCopier.deepCopy(value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 00a297c..038f805 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -44,9 +44,14 @@ import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
 import org.apache.crunch.fn.CompositeMapFn;
 import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.types.CollectionDeepCopier;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.MapDeepCopier;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.writable.WritableDeepCopier;
 import org.apache.crunch.util.PTypes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -77,8 +82,7 @@ public class Avros {
   public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
 
   public static void configureReflectDataFactory(Configuration conf) {
-    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(),
-        ReflectDataFactory.class);
+    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
   }
 
   public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
@@ -110,8 +114,8 @@ public class Avros {
     }
   };
 
-  private static final AvroType<String> strings = new AvroType<String>(String.class,
-      Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8);
+  private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
+      UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>());
   private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
   private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
   private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
@@ -119,12 +123,11 @@ public class Avros {
   private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
   private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
   private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
-      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance());
+      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier<ByteBuffer>());
 
-  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
-      .<Class<?>, PType<?>> builder().put(String.class, strings).put(Long.class, longs)
-      .put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles)
-      .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
+      .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
+      .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
 
   private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
 
@@ -141,7 +144,7 @@ public class Avros {
   }
 
   private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
-    return new AvroType<T>(clazz, Schema.create(schemaType));
+    return new AvroType<T>(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier<T>());
   }
 
   public static final AvroType<Void> nulls() {
@@ -184,7 +187,8 @@ public class Avros {
   }
 
   public static final AvroType<GenericData.Record> generics(Schema schema) {
-    return new AvroType<GenericData.Record>(GenericData.Record.class, schema);
+    return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(
+        schema));
   }
 
   public static final <T> AvroType<T> containers(Class<T> clazz) {
@@ -192,7 +196,8 @@ public class Avros {
   }
 
   public static final <T> AvroType<T> reflects(Class<T> clazz) {
-    return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz));
+    Schema schema = REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz);
+    return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
   }
 
   private static class BytesToWritableMapFn<T extends Writable> extends MapFn<ByteBuffer, T> {
@@ -208,8 +213,8 @@ public class Avros {
     public T map(ByteBuffer input) {
       T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
       try {
-        instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input
-            .arrayOffset(), input.limit())));
+        instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(), input
+            .limit())));
       } catch (IOException e) {
         LOG.error("Exception thrown reading instance of: " + writableClazz, e);
       }
@@ -234,8 +239,8 @@ public class Avros {
   }
 
   public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
-    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(
-        clazz), new WritableToBytesMapFn<T>());
+    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
+        new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz));
   }
 
   private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
@@ -279,8 +284,7 @@ public class Avros {
     }
   }
 
-  private static class CollectionToGenericDataArray extends
-      MapFn<Collection<?>, GenericData.Array<?>> {
+  private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
 
     private final MapFn mapFn;
     private final String jsonSchema;
@@ -322,11 +326,9 @@ public class Avros {
   public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
     AvroType<T> avroType = (AvroType<T>) ptype;
     Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
-    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
-        avroType.getInputMapFn());
-    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema,
-        avroType.getOutputMapFn());
-    return new AvroType(Collection.class, collectionSchema, input, output, ptype);
+    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
+    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
+    return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype), ptype);
   }
 
   private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
@@ -398,7 +400,7 @@ public class Avros {
     Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
     AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
     MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
-    return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
+    return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype), ptype);
   }
 
   private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
@@ -459,7 +461,7 @@ public class Avros {
     private final String jsonSchema;
     private final boolean isReflect;
     private transient Schema schema;
-    
+
     public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
       this.fns = Lists.newArrayList();
       this.avroTypes = Lists.newArrayList();
@@ -497,13 +499,13 @@ public class Avros {
         fn.setContext(getContext());
       }
     }
-    
-    private GenericRecord createRecord(){
+
+    private GenericRecord createRecord() {
       if (isReflect) {
         return new ReflectGenericRecord(schema);
       } else {
         return new GenericData.Record(schema);
-      }      
+      }
     }
 
     @Override
@@ -525,28 +527,27 @@ public class Avros {
     Schema schema = createTupleSchema(p1, p2);
     GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
     TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
-    return new AvroType(Pair.class, schema, input, output, p1, p2);
+    return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), p1, p2);
   }
 
-  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
-      PType<V3> p3) {
+  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
     Schema schema = createTupleSchema(p1, p2, p3);
-    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2,
-        p3), new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
+    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
+        new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), p1, p2, p3);
   }
 
-  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
+      PType<V4> p4) {
     Schema schema = createTupleSchema(p1, p2, p3, p4);
-    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2,
-        p3, p4), new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
+    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
+        new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), p1, p2,
+        p3, p4);
   }
 
   public static final AvroType<TupleN> tuples(PType... ptypes) {
     Schema schema = createTupleSchema(ptypes);
-    return new AvroType(TupleN.class, schema,
-        new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
-            ptypes), ptypes);
+    return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
+        new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), ptypes);
   }
 
   public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
@@ -556,8 +557,8 @@ public class Avros {
       typeArgs[i] = ptypes[i].getTypeClass();
     }
     TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes),
-        new TupleToGenericRecord(schema, ptypes), ptypes);
+    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
+        ptypes), new TupleDeepCopier(clazz, ptypes), ptypes);
   }
 
   private static Schema createTupleSchema(PType<?>... ptypes) {
@@ -574,12 +575,12 @@ public class Avros {
     return schema;
   }
 
-  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
-      MapFn<T, S> outputFn, PType<S> base) {
+  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
+      PType<S> base) {
     AvroType<S> abase = (AvroType<S>) base;
-    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(),
-        inputFn), new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(
-        new PType[0]));
+    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
+        new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(), base.getSubTypes()
+            .toArray(new PType[0]));
   }
 
   public static <T> PType<T> jsons(Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
new file mode 100644
index 0000000..6469208
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * Performs deep copies of Writable values.
+ * @param <T> The type of Writable that can be copied
+ */
+public class WritableDeepCopier<T extends Writable> implements DeepCopier<T>{
+  
+  private Class<T> writableClass;
+
+  public WritableDeepCopier(Class<T> writableClass){
+    this.writableClass = writableClass;
+  }
+
+  @Override
+  public T deepCopy(T source) {
+    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
+    T copiedValue = null;
+    try {
+      source.write(dataOut);
+      dataOut.flush();
+      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+      DataInput dataInput = new DataInputStream(byteInStream);
+      copiedValue = writableClass.newInstance();
+      copiedValue.readFields(dataInput);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error while deep copying " + source, e);
+    }
+    return copiedValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
index 45502a3..23c95ea 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -22,9 +22,9 @@ import java.util.List;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.SourceTarget;
-import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.io.seq.SeqFileSourceTarget;
 import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +39,7 @@ public class WritableType<T, W extends Writable> implements PType<T> {
   private final Converter converter;
   private final MapFn<W, T> inputFn;
   private final MapFn<T, W> outputFn;
+  private final DeepCopier<W> deepCopier;
   private final List<PType> subTypes;
 
   WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn,
@@ -48,6 +49,7 @@ public class WritableType<T, W extends Writable> implements PType<T> {
     this.inputFn = inputDoFn;
     this.outputFn = outputDoFn;
     this.converter = new WritableValueConverter(writableClass);
+    this.deepCopier = new WritableDeepCopier<W>(writableClass);
     this.subTypes = ImmutableList.<PType> builder().add(subTypes).build();
   }
 
@@ -99,17 +101,11 @@ public class WritableType<T, W extends Writable> implements PType<T> {
     return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes.equals(wt.subTypes));
   }
 
-  // Unchecked warnings are suppressed because we know that W and T are the same
-  // type (due to the IdentityFn being used)
-  @SuppressWarnings("unchecked")
   @Override
   public T getDetachedValue(T value) {
-    if (this.inputFn.getClass().equals(IdentityFn.class)) {
-      W writableValue = (W) value;
-      return (T) Writables.deepCopy(writableValue, this.writableClass);
-    } else {
-      return value;
-    }
+    W writableValue = outputFn.map(value);
+    W deepCopy = this.deepCopier.deepCopy(writableValue);
+    return inputFn.map(deepCopy);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
index f4906b7..23bc7f5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -17,11 +17,6 @@
  */
 package org.apache.crunch.types.writable;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
@@ -35,8 +30,11 @@ import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
 import org.apache.crunch.fn.CompositeMapFn;
 import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.CollectionDeepCopier;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.MapDeepCopier;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.crunch.types.TupleFactory;
 import org.apache.crunch.util.PTypes;
 import org.apache.hadoop.conf.Configuration;
@@ -582,31 +580,7 @@ public class Writables {
     return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());
   }
 
-  /**
-   * Perform a deep copy of a writable value.
-   * 
-   * @param value
-   *          The value to be copied
-   * @param writableClass
-   *          The Writable class of the value to be copied
-   * @return A fully detached deep copy of the input value
-   */
-  public static <T extends Writable> T deepCopy(T value, Class<T> writableClass) {
-    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
-    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
-    T copiedValue = null;
-    try {
-      value.write(dataOut);
-      dataOut.flush();
-      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
-      DataInput dataInput = new DataInputStream(byteInStream);
-      copiedValue = writableClass.newInstance();
-      copiedValue.readFields(dataInput);
-    } catch (Exception e) {
-      throw new CrunchRuntimeException("Error while deep copying " + value, e);
-    }
-    return copiedValue;
-  }
+
 
   // Not instantiable
   private Writables() {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java b/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
index f9217d5..e6fd90c 100644
--- a/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
@@ -77,7 +77,7 @@ public class PTypeUtilsTest {
 
   @Test
   public void testAvroRegistered() {
-    AvroType<Utf8> at = new AvroType<Utf8>(Utf8.class, Schema.create(Schema.Type.STRING));
+    AvroType<Utf8> at = new AvroType<Utf8>(Utf8.class, Schema.create(Schema.Type.STRING), new DeepCopier.NoOpDeepCopier<Utf8>());
     Avros.register(Utf8.class, at);
     assertEquals(at, Avros.records(Utf8.class));
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
index 8a3a12f..017a813 100644
--- a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
@@ -37,8 +37,8 @@ public class TupleDeepCopierTest {
     person.setSiblingnames(Lists.<CharSequence> newArrayList());
 
     Pair<Integer, Person> inputPair = Pair.of(1, person);
-    DeepCopier<Pair<Integer, Person>> deepCopier = new TupleDeepCopier<Pair<Integer, Person>>(
-        Avros.pairs(Avros.ints(), Avros.records(Person.class)));
+    DeepCopier<Pair> deepCopier = new TupleDeepCopier<Pair>(
+        Pair.class, Avros.ints(), Avros.records(Person.class));
 
     Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index fa1b4c4..6e2d89e 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -20,6 +20,7 @@ package org.apache.crunch.types.avro;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
index 092b89e..28777f5 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.crunch.Pair;
+import org.apache.crunch.TupleN;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.junit.Test;
@@ -228,5 +229,16 @@ public class AvroTypeTest {
     assertEquals(stringPersonMap, detachedMap);
     assertNotSame(value, detachedMap.get(key));
   }
-
+  
+  @Test
+  public void testGetDetachedValue_TupleN(){
+    Person person = createPerson();
+    AvroType<TupleN> ptype = Avros.tuples(Avros.records(Person.class));
+    TupleN tuple = new TupleN(person);
+    TupleN detachedTuple = ptype.getDetachedValue(tuple);
+    
+    assertEquals(tuple, detachedTuple);
+    assertNotSame(person, detachedTuple.get(0));
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
index aec19f2..6e66f74 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
@@ -17,10 +17,10 @@
  */
 package org.apache.crunch.types.avro;
 
-import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
@@ -39,6 +39,7 @@ import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.io.IntWritable;
@@ -224,7 +225,7 @@ public class AvrosTest {
   
   @Test
   public void testIsPrimitive_TruePrimitiveValue(){
-    AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT));
+    AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT), new DeepCopier.NoOpDeepCopier());
     assertTrue(Avros.isPrimitive(truePrimitiveAvroType));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
new file mode 100644
index 0000000..a691df2
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class WritableDeepCopierTest {
+
+  private WritableDeepCopier<Text> deepCopier;
+  
+  @Before
+  public void setUp(){
+    deepCopier = new WritableDeepCopier<Text>(Text.class);
+  }
+  
+  @Test
+  public void testDeepCopy(){
+    Text text = new Text("value");
+    Text deepCopy = deepCopier.deepCopy(text);
+    
+    assertEquals(text, deepCopy);
+    assertNotSame(text, deepCopy);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
index ea0d11a..51a87f5 100644
--- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
@@ -20,18 +20,20 @@ package org.apache.crunch.types.writable;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
 
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.Pair;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
-public class WritableTypeTest {
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
-  @Test
-  public void testGetDetachedValue_AlreadyMappedWritable() {
-    WritableType<String, Text> stringType = Writables.strings();
-    String value = "test";
-    assertSame(value, stringType.getDetachedValue(value));
-  }
+public class WritableTypeTest {
 
   @Test
   public void testGetDetachedValue_CustomWritable() {
@@ -43,4 +45,41 @@ public class WritableTypeTest {
     assertNotSame(value, detachedValue);
   }
 
+  @Test
+  public void testGetDetachedValue_Collection() {
+    Collection<Text> textCollection = Lists.newArrayList(new Text("value"));
+    WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables.collections(Writables
+        .writables(Text.class));
+
+    Collection<Text> detachedCollection = ptype.getDetachedValue(textCollection);
+    assertEquals(textCollection, detachedCollection);
+    assertNotSame(textCollection.iterator().next(), detachedCollection.iterator().next());
+  }
+
+  @Test
+  public void testGetDetachedValue_Tuple() {
+    Pair<Text, Text> textPair = Pair.of(new Text("one"), new Text("two"));
+    WritableType<Pair<Text, Text>, TupleWritable> ptype = Writables.pairs(Writables.writables(Text.class),
+        Writables.writables(Text.class));
+    ptype.getOutputMapFn().initialize();
+    ptype.getInputMapFn().initialize();
+
+    Pair<Text, Text> detachedPair = ptype.getDetachedValue(textPair);
+    assertEquals(textPair, detachedPair);
+    assertNotSame(textPair.first(), detachedPair.first());
+    assertNotSame(textPair.second(), detachedPair.second());
+  }
+
+  @Test
+  public void testGetDetachedValue_Map() {
+    Map<String, Text> stringTextMap = Maps.newHashMap();
+    stringTextMap.put("key", new Text("value"));
+
+    WritableType<Map<String, Text>, MapWritable> ptype = Writables.maps(Writables.writables(Text.class));
+    Map<String, Text> detachedMap = ptype.getDetachedValue(stringTextMap);
+
+    assertEquals(stringTextMap, detachedMap);
+    assertNotSame(stringTextMap.get("key"), detachedMap.get("key"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
index a8699b3..5396fba 100644
--- a/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -253,11 +253,4 @@ public class WritablesTest {
     assertEquals(writable, ptype.getOutputMapFn().map(java));
   }
 
-  @Test
-  public void testDeepCopy() {
-    Text text = new Text("Test");
-    Text copiedText = Writables.deepCopy(text, Text.class);
-    assertEquals(text, copiedText);
-    assertNotSame(text, copiedText);
-  }
 }