You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/11/26 04:27:10 UTC

git commit: Second cut at rewriting custom Writable types to a more compact format.

Updated Branches:
  refs/heads/master 2a8b6c149 -> c51ef57ae


Second cut at rewriting custom Writable types to a more compact format.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c51ef57a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c51ef57a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c51ef57a

Branch: refs/heads/master
Commit: c51ef57aeecb78901e7489728367c2921f4c08d7
Parents: 2a8b6c1
Author: Josh Wills <jw...@apache.org>
Authored: Wed Feb 27 21:34:44 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Nov 21 19:50:32 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Sort.java   |   7 +-
 .../lib/sort/TupleWritableComparator.java       |  66 ++++++----
 .../types/writable/GenericArrayWritable.java    |  35 ++----
 .../crunch/types/writable/TextMapWritable.java  |  41 ++-----
 .../crunch/types/writable/TupleWritable.java    |  65 +++++-----
 .../apache/crunch/types/writable/Writables.java | 119 ++++++++++++-------
 .../lib/TupleWritablePartitionerTest.java       |  32 +----
 .../writable/GenericArrayWritableTest.java      |  25 ++--
 .../crunch/types/writable/WritableTypeTest.java |   2 +-
 .../crunch/types/writable/WritablesTest.java    |  37 ++++--
 10 files changed, 229 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
index 94ce7d8..011d9cd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
@@ -40,6 +40,7 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableType;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.crunch.util.PartitionUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -252,7 +253,11 @@ public class Sort {
       if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
         builder.sortComparatorClass(ReverseWritableComparator.class);
       } else {
-        TupleWritableComparator.configureOrdering(conf, columnOrders);
+        WritableType[] wt = new WritableType[columnOrders.length];
+        for (int i = 0; i < wt.length; i++) {
+          wt[i] = (WritableType) keyType.getSubTypes().get(i);
+        }
+        TupleWritableComparator.configureOrdering(conf, wt, columnOrders);
         builder.sortComparatorClass(TupleWritableComparator.class);
       }
     } else if (tf == AvroTypeFamily.getInstance()) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
index 07ee5b5..9677fc1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
@@ -17,13 +17,21 @@
  */
 package org.apache.crunch.lib.sort;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
+import com.google.common.collect.Lists;
+import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.lib.Sort.ColumnOrder;
 import org.apache.crunch.lib.Sort.Order;
 import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableType;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
@@ -31,36 +39,29 @@ import org.apache.hadoop.io.WritableComparator;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
+import org.apache.hadoop.io.WritableFactories;
 
 public class TupleWritableComparator extends WritableComparator implements Configurable {
 
   private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
 
   private Configuration conf;
+  Writable[] w1;
+  Writable[] w2;
   private ColumnOrder[] columnOrders;
 
   public TupleWritableComparator() {
     super(TupleWritable.class, true);
   }
 
-  public static void configureOrdering(Configuration conf, Order... orders) {
-    conf.set(CRUNCH_ORDERING_PROPERTY,
-        Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
-          @Override
-          public String apply(Order o) {
-            return o.name();
-          }
-        })));
-  }
-
-  public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
-    conf.set(CRUNCH_ORDERING_PROPERTY,
-        Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
-          @Override
-          public String apply(ColumnOrder o) {
-            return o.column() + ";" + o.order().name();
-          }
-        })));
+  public static void configureOrdering(Configuration conf, WritableType[] types, ColumnOrder[] columnOrders) {
+    List<String> ordering = Lists.newArrayList();
+    for (int i = 0; i < types.length; i++) {
+      Class<?> cls = types[i].getSerializationClass();
+      String order = columnOrders[i].order().name();
+      ordering.add(cls.getCanonicalName() + ";" + order);
+    }
+    conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(ordering));
   }
 
   @Override
@@ -83,16 +84,22 @@ public class TupleWritableComparator extends WritableComparator implements Confi
       } else if (!ta.has(index) && tb.has(index)) {
         return -order;
       } else {
-        Writable v1 = ta.get(index);
-        Writable v2 = tb.get(index);
+        BytesWritable v1 = ta.get(index);
+        BytesWritable v2 = tb.get(index);
         if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-          if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
-            int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
+          try {
+            w1[index].readFields(new DataInputStream(new ByteArrayInputStream(v1.getBytes())));
+            w2[index].readFields(new DataInputStream(new ByteArrayInputStream(v2.getBytes())));
+          } catch (IOException e) {
+            throw new CrunchRuntimeException(e);
+          }
+          if (w1[index] instanceof WritableComparable && w2[index] instanceof WritableComparable) {
+            int cmp = ((WritableComparable) w1[index]).compareTo((WritableComparable) w2[index]);
             if (cmp != 0) {
               return order * cmp;
             }
           } else {
-            int cmp = v1.hashCode() - v2.hashCode();
+            int cmp = w1[index].hashCode() - w2[index].hashCode();
             if (cmp != 0) {
               return order * cmp;
             }
@@ -115,11 +122,20 @@ public class TupleWritableComparator extends WritableComparator implements Confi
       String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
       String[] columnOrderNames = ordering.split(",");
       columnOrders = new ColumnOrder[columnOrderNames.length];
+      w1 = new Writable[columnOrderNames.length];
+      w2 = new Writable[columnOrderNames.length];
       for (int i = 0; i < columnOrders.length; i++) {
         String[] split = columnOrderNames[i].split(";");
-        int column = Integer.parseInt(split[0]);
+        String className = split[0];
+        try {
+          Class cls = Class.forName(className);
+          w1[i] = WritableFactories.newInstance(cls);
+          w2[i] = WritableFactories.newInstance(cls);
+        } catch (Exception e) {
+          throw new CrunchRuntimeException(e);
+        }
         Order order = Order.valueOf(split[1]);
-        columnOrders[i] = ColumnOrder.by(column, order);
+        columnOrders[i] = ColumnOrder.by(i + 1, order);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
index 8b54008..9731ff4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
@@ -23,20 +23,17 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableUtils;
 
 /**
  * A {@link Writable} for marshalling/unmarshalling Collections. Note that
  * element order is <em>undefined</em>!
  *
- * @param <T> The value type
  */
-class GenericArrayWritable<T> implements Writable {
-  private Writable[] values;
+class GenericArrayWritable implements Writable {
+  private BytesWritable[] values;
   private Class<? extends Writable> valueClass;
 
   public GenericArrayWritable(Class<? extends Writable> valueClass) {
@@ -47,43 +44,29 @@ class GenericArrayWritable<T> implements Writable {
     // for deserialization
   }
 
-  public void set(Writable[] values) {
+  public void set(BytesWritable[] values) {
     this.values = values;
   }
 
-  public Writable[] get() {
+  public BytesWritable[] get() {
     return values;
   }
 
   public void readFields(DataInput in) throws IOException {
-    values = new Writable[WritableUtils.readVInt(in)]; // construct values
+    values = new BytesWritable[WritableUtils.readVInt(in)]; // construct values
     if (values.length > 0) {
       int nulls = WritableUtils.readVInt(in);
       if (nulls == values.length) {
         return;
       }
-      String valueType = Text.readString(in);
-      setValueType(valueType);
       for (int i = 0; i < values.length - nulls; i++) {
-        Writable value = WritableFactories.newInstance(valueClass);
+        BytesWritable value = new BytesWritable();
         value.readFields(in); // read a value
         values[i] = value; // store it in values
       }
     }
   }
 
-  protected void setValueType(String valueType) {
-    if (valueClass == null) {
-      try {
-        valueClass = Class.forName(valueType).asSubclass(Writable.class);
-      } catch (ClassNotFoundException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    } else if (!valueType.equals(valueClass.getName())) {
-      throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass);
-    }
-  }
-
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVInt(out, values.length);
     if (values.length > 0) {
@@ -95,10 +78,6 @@ class GenericArrayWritable<T> implements Writable {
       }
       WritableUtils.writeVInt(out, nulls);
       if (values.length - nulls > 0) {
-        if (valueClass == null) {
-          throw new IllegalStateException("Value class not set by constructor or read");
-        }
-        Text.writeString(out, valueClass.getName());
         for (int i = 0; i < values.length; i++) {
           if (values[i] != null) {
             values[i].write(out);

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
index 1ab51df..d25bd82 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
@@ -23,63 +23,46 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
 import com.google.common.collect.Maps;
 
-class TextMapWritable<T extends Writable> implements Writable {
+class TextMapWritable implements Writable {
 
-  private Class<T> valueClazz;
-  private final Map<Text, T> instance;
+  private final Map<Text, BytesWritable> instance;
 
   public TextMapWritable() {
     this.instance = Maps.newHashMap();
   }
 
-  public TextMapWritable(Class<T> valueClazz) {
-    this.valueClazz = valueClazz;
-    this.instance = Maps.newHashMap();
-  }
-
-  public void put(Text txt, T value) {
+  public void put(Text txt, BytesWritable value) {
     instance.put(txt, value);
   }
 
-  public Set<Map.Entry<Text, T>> entrySet() {
+  public Set<Map.Entry<Text, BytesWritable>> entrySet() {
     return instance.entrySet();
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     instance.clear();
-    try {
-      this.valueClazz = (Class<T>) Class.forName(Text.readString(in));
-    } catch (ClassNotFoundException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
-    }
     int entries = WritableUtils.readVInt(in);
-    try {
-      for (int i = 0; i < entries; i++) {
-        Text txt = new Text();
-        txt.readFields(in);
-        T value = valueClazz.newInstance();
-        value.readFields(in);
-        instance.put(txt, value);
-      }
-    } catch (IllegalAccessException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
-    } catch (InstantiationException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
+    for (int i = 0; i < entries; i++) {
+      Text txt = new Text();
+      txt.readFields(in);
+      BytesWritable value = new BytesWritable();
+      value.readFields(in);
+      instance.put(txt, value);
     }
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    Text.writeString(out, valueClazz.getName());
     WritableUtils.writeVInt(out, instance.size());
-    for (Map.Entry<Text, T> e : instance.entrySet()) {
+    for (Map.Entry<Text, BytesWritable> e : instance.entrySet()) {
       e.getKey().write(out);
       e.getValue().write(out);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index 1c3536b..251e4f5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -17,14 +17,19 @@
  */
 package org.apache.crunch.types.writable;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.io.Text;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableUtils;
 
 /**
@@ -35,8 +40,9 @@ import org.apache.hadoop.io.WritableUtils;
 public class TupleWritable implements WritableComparable<TupleWritable> {
 
   private long written;
-  private Writable[] values;
-
+  private BytesWritable[] values;
+  private List<Class<Writable>> writableClasses;
+  
   /**
    * Create an empty tuple with no allocated storage for writables.
    */
@@ -47,11 +53,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
    * Initialize tuple with storage; unknown whether any of them contain
    * &quot;written&quot; values.
    */
-  public TupleWritable(Writable[] vals) {
+  public TupleWritable(BytesWritable[] vals) {
     written = 0L;
     values = vals;
   }
 
+  public void setWritableClasses(List<Class<Writable>> writableClasses) {
+    this.writableClasses = writableClasses;
+  }
+  
   /**
    * Return true if tuple has an element at the position provided.
    */
@@ -62,7 +72,7 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
   /**
    * Get ith Writable from Tuple.
    */
-  public Writable get(int i) {
+  public BytesWritable get(int i) {
     return values[i];
   }
 
@@ -110,7 +120,19 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
   public String toString() {
     StringBuffer buf = new StringBuffer("[");
     for (int i = 0; i < values.length; ++i) {
-      buf.append(has(i) ? values[i].toString() : "");
+      if (has(i)) {
+        if (writableClasses != null) {
+          Writable w = WritableFactories.newInstance(writableClasses.get(i));
+          try {
+            w.readFields(new DataInputStream(new ByteArrayInputStream(values[i].getBytes())));
+          } catch (IOException e) {
+            throw new CrunchRuntimeException(e);
+          }
+          buf.append(w.toString());
+        } else {
+          buf.append(values[i].toString());
+        }
+      }
       buf.append(",");
     }
     if (values.length != 0)
@@ -131,11 +153,6 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
     WritableUtils.writeVLong(out, written);
     for (int i = 0; i < values.length; ++i) {
       if (has(i)) {
-        Text.writeString(out, values[i].getClass().getName());
-      }
-    }
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i)) {
         values[i].write(out);
       }
     }
@@ -144,31 +161,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
   /**
    * {@inheritDoc}
    */
-  @SuppressWarnings("unchecked")
-  // No static typeinfo on Tuples
   public void readFields(DataInput in) throws IOException {
     int card = WritableUtils.readVInt(in);
-    values = new Writable[card];
+    values = new BytesWritable[card];
     written = WritableUtils.readVLong(in);
-    Class<? extends Writable>[] cls = new Class[card];
-    try {
-      for (int i = 0; i < card; ++i) {
-        if (has(i)) {
-          cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
-        }
-      }
-      for (int i = 0; i < card; ++i) {
-        if (has(i)) {
-          values[i] = cls[i].newInstance();
-          values[i].readFields(in);
-        }
+    for (int i = 0; i < card; ++i) {
+      if (has(i)) {
+        values[i] = new BytesWritable();
+        values[i].readFields(in);
       }
-    } catch (ClassNotFoundException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
-    } catch (IllegalAccessException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
-    } catch (InstantiationException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index 78cf3ae..0273e5e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -17,11 +17,15 @@
  */
 package org.apache.crunch.types.writable;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple;
@@ -44,6 +48,8 @@ import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import com.google.common.collect.ImmutableMap;
@@ -273,6 +279,16 @@ public class Writables {
     return new WritableTableType((WritableType) key, (WritableType) value);
   }
 
+  private static <W extends Writable> W create(Class<W> clazz, BytesWritable bytes) {
+    W instance = (W) WritableFactories.newInstance(clazz);
+    try {
+      instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes())));
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+    return instance;
+  }
+  
   /**
    * For mapping from {@link TupleWritable} instances to {@link Tuple}s.
    * 
@@ -280,14 +296,17 @@ public class Writables {
   private static class TWTupleMapFn extends MapFn<TupleWritable, Tuple> {
     private final TupleFactory<?> tupleFactory;
     private final List<MapFn> fns;
-
+    private final List<Class<Writable>> writableClasses;
+    
     private transient Object[] values;
 
-    public TWTupleMapFn(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
+    public TWTupleMapFn(TupleFactory<?> tupleFactory, WritableType<?, ?>... ptypes) {
       this.tupleFactory = tupleFactory;
       this.fns = Lists.newArrayList();
-      for (PType ptype : ptypes) {
+      this.writableClasses = Lists.newArrayList();
+      for (WritableType ptype : ptypes) {
         fns.add(ptype.getInputMapFn());
+        writableClasses.add(ptype.getSerializationClass());
       }
     }
 
@@ -321,7 +340,8 @@ public class Writables {
     public Tuple map(TupleWritable in) {
       for (int i = 0; i < values.length; i++) {
         if (in.has(i)) {
-          values[i] = fns.get(i).map(in.get(i));
+          Writable w = create(writableClasses.get(i), in.get(i));
+          values[i] = fns.get(i).map(w);
         } else {
           values[i] = null;
         }
@@ -337,14 +357,17 @@ public class Writables {
   private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> {
 
     private transient TupleWritable writable;
-    private transient Writable[] values;
+    private transient BytesWritable[] values;
 
     private final List<MapFn> fns;
-
+    private final List<Class<Writable>> writableClasses;
+    
     public TupleTWMapFn(PType<?>... ptypes) {
       this.fns = Lists.newArrayList();
+      this.writableClasses = Lists.newArrayList();
       for (PType<?> ptype : ptypes) {
         fns.add(ptype.getOutputMapFn());
+        writableClasses.add(((WritableType) ptype).getSerializationClass());
       }
     }
 
@@ -364,8 +387,9 @@ public class Writables {
     
     @Override
     public void initialize() {
-      this.values = new Writable[fns.size()];
+      this.values = new BytesWritable[fns.size()];
       this.writable = new TupleWritable(values);
+      this.writable.setWritableClasses(writableClasses);
       for (MapFn fn : fns) {
         fn.initialize();
       }
@@ -378,7 +402,8 @@ public class Writables {
         Object value = input.get(i);
         if (value != null) {
           writable.setWritten(i);
-          values[i] = (Writable) fns.get(i).map(value);
+          Writable w = (Writable) fns.get(i).map(value);
+          values[i] = new BytesWritable(WritableUtils.toByteArray(w));
         }
       }
       return writable;
@@ -386,38 +411,46 @@ public class Writables {
   }
 
   public static <V1, V2> WritableType<Pair<V1, V2>, TupleWritable> pairs(PType<V1> p1, PType<V2> p2) {
-    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, p1, p2);
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, (WritableType) p1, (WritableType) p2);
     TupleTWMapFn output = new TupleTWMapFn(p1, p2);
     return new WritableType(Pair.class, TupleWritable.class, input, output, p1, p2);
   }
 
   public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1, PType<V2> p2,
       PType<V3> p3) {
-    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, p1, p2, p3);
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, (WritableType) p1,
+        (WritableType) p2, (WritableType) p3);
     TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3);
     return new WritableType(Tuple3.class, TupleWritable.class, input, output, p1, p2, p3);
   }
 
   public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1, PType<V2> p2,
       PType<V3> p3, PType<V4> p4) {
-    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, p1, p2, p3, p4);
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, (WritableType) p1,
+        (WritableType) p2, (WritableType) p3, (WritableType) p4);
     TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4);
     return new WritableType(Tuple4.class, TupleWritable.class, input, output, p1, p2, p3, p4);
   }
 
   public static WritableType<TupleN, TupleWritable> tuples(PType... ptypes) {
-    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, ptypes);
+    WritableType[] wt = new WritableType[ptypes.length];
+    for (int i = 0; i < wt.length; i++) {
+      wt[i] = (WritableType) ptypes[i];
+    }
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, wt);
     TupleTWMapFn output = new TupleTWMapFn(ptypes);
     return new WritableType(TupleN.class, TupleWritable.class, input, output, ptypes);
   }
 
   public static <T extends Tuple> PType<T> tuples(Class<T> clazz, PType... ptypes) {
     Class[] typeArgs = new Class[ptypes.length];
+    WritableType[] wt = new WritableType[ptypes.length];
     for (int i = 0; i < typeArgs.length; i++) {
       typeArgs[i] = ptypes[i].getTypeClass();
+      wt[i] = (WritableType) ptypes[i];
     }
     TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-    TWTupleMapFn input = new TWTupleMapFn(factory, ptypes);
+    TWTupleMapFn input = new TWTupleMapFn(factory, wt);
     TupleTWMapFn output = new TupleTWMapFn(ptypes);
     return new WritableType(clazz, TupleWritable.class, input, output, ptypes);
   }
@@ -430,9 +463,11 @@ public class Writables {
   }
 
   private static class ArrayCollectionMapFn<T> extends MapFn<GenericArrayWritable, Collection<T>> {
+    private Class<Writable> clazz;
     private final MapFn<Object, T> mapFn;
-
-    public ArrayCollectionMapFn(MapFn<Object, T> mapFn) {
+    
+    public ArrayCollectionMapFn(Class<Writable> clazz, MapFn<Object, T> mapFn) {
+      this.clazz = clazz;
       this.mapFn = mapFn;
     }
 
@@ -454,8 +489,9 @@ public class Writables {
     @Override
     public Collection<T> map(GenericArrayWritable input) {
       Collection<T> collection = Lists.newArrayList();
-      for (Writable writable : input.get()) {
-        collection.add(mapFn.map(writable));
+      for (BytesWritable raw : input.get()) {
+        Writable w = create(clazz, raw);
+        collection.add(mapFn.map(w));
       }
       return collection;
     }
@@ -463,11 +499,9 @@ public class Writables {
 
   private static class CollectionArrayMapFn<T> extends MapFn<Collection<T>, GenericArrayWritable> {
 
-    private final Class<? extends Writable> clazz;
     private final MapFn<T, Object> mapFn;
 
-    public CollectionArrayMapFn(Class<? extends Writable> clazz, MapFn<T, Object> mapFn) {
-      this.clazz = clazz;
+    public CollectionArrayMapFn(MapFn<T, Object> mapFn) {
       this.mapFn = mapFn;
     }
 
@@ -488,27 +522,31 @@ public class Writables {
 
     @Override
     public GenericArrayWritable map(Collection<T> input) {
-      GenericArrayWritable arrayWritable = new GenericArrayWritable(clazz);
-      Writable[] w = new Writable[input.size()];
+      GenericArrayWritable arrayWritable = new GenericArrayWritable();
+      BytesWritable[] w = new BytesWritable[input.size()];
       int index = 0;
       for (T in : input) {
-        w[index++] = ((Writable) mapFn.map(in));
+        Writable v = (Writable) mapFn.map(in);
+        w[index++] = new BytesWritable(WritableUtils.toByteArray(v));
       }
       arrayWritable.set(w);
       return arrayWritable;
     }
   }
 
-  public static <T> WritableType<Collection<T>, GenericArrayWritable<T>> collections(PType<T> ptype) {
+  public static <T> WritableType<Collection<T>, GenericArrayWritable> collections(PType<T> ptype) {
     WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
-    return new WritableType(Collection.class, GenericArrayWritable.class, new ArrayCollectionMapFn(wt.getInputMapFn()),
-        new CollectionArrayMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+    return new WritableType(Collection.class, GenericArrayWritable.class,
+        new ArrayCollectionMapFn(wt.getSerializationClass(), wt.getInputMapFn()),
+        new CollectionArrayMapFn(wt.getOutputMapFn()), ptype);
   }
 
-  private static class MapInputMapFn<T> extends MapFn<TextMapWritable<Writable>, Map<String, T>> {
+  private static class MapInputMapFn<T> extends MapFn<TextMapWritable, Map<String, T>> {
+    private final Class<Writable> clazz;
     private final MapFn<Writable, T> mapFn;
 
-    public MapInputMapFn(MapFn<Writable, T> mapFn) {
+    public MapInputMapFn(Class<Writable> clazz, MapFn<Writable, T> mapFn) {
+      this.clazz = clazz;
       this.mapFn = mapFn;
     }
 
@@ -528,22 +566,21 @@ public class Writables {
     }
 
     @Override
-    public Map<String, T> map(TextMapWritable<Writable> input) {
+    public Map<String, T> map(TextMapWritable input) {
       Map<String, T> out = Maps.newHashMap();
-      for (Map.Entry<Text, Writable> e : input.entrySet()) {
-        out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+      for (Map.Entry<Text, BytesWritable> e : input.entrySet()) {
+        Writable v = create(clazz, e.getValue());
+        out.put(e.getKey().toString(), mapFn.map(v));
       }
       return out;
     }
   }
 
-  private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable<Writable>> {
+  private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable> {
 
-    private final Class<Writable> clazz;
     private final MapFn<T, Writable> mapFn;
 
-    public MapOutputMapFn(Class<Writable> clazz, MapFn<T, Writable> mapFn) {
-      this.clazz = clazz;
+    public MapOutputMapFn(MapFn<T, Writable> mapFn) {
       this.mapFn = mapFn;
     }
 
@@ -563,10 +600,11 @@ public class Writables {
     }
 
     @Override
-    public TextMapWritable<Writable> map(Map<String, T> input) {
-      TextMapWritable<Writable> tmw = new TextMapWritable<Writable>(clazz);
+    public TextMapWritable map(Map<String, T> input) {
+      TextMapWritable tmw = new TextMapWritable();
       for (Map.Entry<String, T> e : input.entrySet()) {
-        tmw.put(new Text(e.getKey()), mapFn.map(e.getValue()));
+        Writable w = mapFn.map(e.getValue());
+        tmw.put(new Text(e.getKey()), new BytesWritable(WritableUtils.toByteArray(w)));
       }
       return tmw;
     }
@@ -574,8 +612,9 @@ public class Writables {
 
   public static <T> WritableType<Map<String, T>, MapWritable> maps(PType<T> ptype) {
     WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
-    return new WritableType(Map.class, TextMapWritable.class, new MapInputMapFn(wt.getInputMapFn()),
-        new MapOutputMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+    return new WritableType(Map.class, TextMapWritable.class,
+        new MapInputMapFn(wt.getSerializationClass(), wt.getInputMapFn()),
+        new MapOutputMapFn(wt.getOutputMapFn()), ptype);
   }
 
   public static <T> PType<T> jsons(Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
index 35ccc11..e8727c3 100644
--- a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -21,9 +21,10 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.crunch.lib.join.JoinUtils.TupleWritablePartitioner;
 import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -39,30 +40,9 @@ public class TupleWritablePartitionerTest {
   @Test
   public void testGetPartition() {
     IntWritable intWritable = new IntWritable(3);
-    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
-    assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
-    assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+    BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(intWritable));
+    TupleWritable key = new TupleWritable(new BytesWritable[] { bw });
+    assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+    assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
   }
-
-  @Test
-  public void testGetPartition_NegativeHashValue() {
-    IntWritable intWritable = new IntWritable(-3);
-    // Sanity check, if this doesn't work then the premise of this test is wrong
-    assertEquals(-3, intWritable.hashCode());
-
-    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
-    assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
-    assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
-  }
-
-  @Test
-  public void testGetPartition_IntegerMinValue() {
-    IntWritable intWritable = new IntWritable(Integer.MIN_VALUE);
-    // Sanity check, if this doesn't work then the premise of this test is wrong
-    assertEquals(Integer.MIN_VALUE, intWritable.hashCode());
-
-    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
-    assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
index c807a90..c446a69 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertThat;
 import java.util.Arrays;
 
 import org.apache.crunch.test.Tests;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.junit.Test;
@@ -35,36 +36,38 @@ public class GenericArrayWritableTest {
 
   @Test
   public void testEmpty() {
-    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
-    src.set(new Text[0]);
+    GenericArrayWritable src = new GenericArrayWritable();
+    src.set(new BytesWritable[0]);
 
-    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+    GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
 
     assertThat(dest.get().length, is(0));
   }
 
   @Test
   public void testNonEmpty() {
-    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
-    src.set(new Text[] { new Text("foo"), new Text("bar") });
+    GenericArrayWritable src = new GenericArrayWritable();
+    src.set(new BytesWritable[] { new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes()) });
 
-    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+    GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
 
     assertThat(src.get(), not(sameInstance(dest.get())));
     assertThat(dest.get().length, is(2));
-    assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("foo"), new Text("bar")));
+    assertThat(Arrays.asList(dest.get()),
+        hasItems(new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes())));
   }
 
   @Test
   public void testNulls() {
-    GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class);
-    src.set(new Text[] { new Text("a"), null, new Text("b") });
+    GenericArrayWritable src = new GenericArrayWritable();
+    src.set(new BytesWritable[] { new BytesWritable("a".getBytes()), null, new BytesWritable("b".getBytes()) });
 
-    GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>());
+    GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable());
 
     assertThat(src.get(), not(sameInstance(dest.get())));
     assertThat(dest.get().length, is(3));
-    assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("a"), new Text("b"), null));
+    assertThat(Arrays.asList(dest.get()),
+        hasItems(new BytesWritable("a".getBytes()), new BytesWritable("b".getBytes()), null));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
index 65e946b..19a9bfe 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
@@ -58,7 +58,7 @@ public class WritableTypeTest {
   @Test
   public void testGetDetachedValue_Collection() {
     Collection<Text> textCollection = Lists.newArrayList(new Text("value"));
-    WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables
+    WritableType<Collection<Text>, GenericArrayWritable> ptype = Writables
         .collections(Writables.writables(Text.class));
     ptype.initialize(new Configuration());
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/c51ef57a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
index 5396fba..b1f4107 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -111,15 +112,22 @@ public class WritablesTest {
     String s = "abc";
     Collection<String> j = Lists.newArrayList();
     j.add(s);
-    GenericArrayWritable<Text> w = new GenericArrayWritable<Text>(Text.class);
-    w.set(new Text[] { new Text(s) });
+    GenericArrayWritable w = new GenericArrayWritable();
+    Text t = new Text(s);
+    BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(t));
+    w.set(new BytesWritable[] { bw });
     testInputOutputFn(Writables.collections(Writables.strings()), j, w);
   }
 
   @Test
   public void testPairs() throws Exception {
     Pair<String, String> j = Pair.of("a", "b");
-    TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), });
+    Text[] t = new Text[] { new Text("a"), new Text("b"), };
+    BytesWritable[] b = new BytesWritable[t.length];
+    for (int i = 0; i < t.length; i++) {
+      b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
+    }
+    TupleWritable w = new TupleWritable(b);
     w.setWritten(0);
     w.setWritten(1);
     testInputOutputFn(Writables.pairs(Writables.strings(), Writables.strings()), j, w);
@@ -144,7 +152,12 @@ public class WritablesTest {
   @SuppressWarnings("rawtypes")
   public void testTriples() throws Exception {
     Tuple3 j = Tuple3.of("a", "b", "c");
-    TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), });
+    Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), };
+    BytesWritable[] b = new BytesWritable[t.length];
+    for (int i = 0; i < t.length; i++) {
+      b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
+    }
+    TupleWritable w = new TupleWritable(b);
     w.setWritten(0);
     w.setWritten(1);
     w.setWritten(2);
@@ -156,7 +169,12 @@ public class WritablesTest {
   @SuppressWarnings("rawtypes")
   public void testQuads() throws Exception {
     Tuple4 j = Tuple4.of("a", "b", "c", "d");
-    TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), });
+    Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), };
+    BytesWritable[] b = new BytesWritable[t.length];
+    for (int i = 0; i < t.length; i++) {
+      b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
+    }
+    TupleWritable w = new TupleWritable(b);
     w.setWritten(0);
     w.setWritten(1);
     w.setWritten(2);
@@ -169,8 +187,13 @@ public class WritablesTest {
   @Test
   public void testTupleN() throws Exception {
     TupleN j = new TupleN("a", "b", "c", "d", "e");
-    TupleWritable w = new TupleWritable(new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"),
-        new Text("e"), });
+    Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"),
+        new Text("e"), };
+    BytesWritable[] b = new BytesWritable[t.length];
+    for (int i = 0; i < t.length; i++) {
+      b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
+    }
+    TupleWritable w = new TupleWritable(b);
     w.setWritten(0);
     w.setWritten(1);
     w.setWritten(2);