You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/02/15 00:03:47 UTC

git commit: CRUNCH-329: Fix secondary sorts for writables by re-introducing type info.

Repository: crunch
Updated Branches:
  refs/heads/master fce2b23b8 -> 1a160b653


CRUNCH-329: Fix secondary sorts for writables by re-introducing type info.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1a160b65
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1a160b65
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1a160b65

Branch: refs/heads/master
Commit: 1a160b653509eaa4ce0e9c42ed5919a5ac545b98
Parents: fce2b23
Author: Josh Wills <jw...@apache.org>
Authored: Wed Jan 22 19:54:45 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Feb 13 13:03:29 2014 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/lib/SecondarySortIT.java  |  19 ++-
 .../lib/sort/TupleWritableComparator.java       |  44 +-----
 .../crunch/types/writable/TupleWritable.java    | 127 ++++++++-------
 .../apache/crunch/types/writable/Writables.java | 158 ++++++++++++++++---
 .../lib/TupleWritablePartitionerTest.java       |   3 +-
 .../crunch/types/writable/WritablesTest.java    |  39 +----
 6 files changed, 236 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
index 242f621..7284ab1 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.crunch.lib;
 
-import static org.apache.crunch.types.avro.Avros.*;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
@@ -29,6 +28,9 @@ import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -38,7 +40,16 @@ import com.google.common.collect.ImmutableList;
 public class SecondarySortIT extends CrunchTestSupport implements Serializable {
 
   @Test
-  public void testSecondarySort() throws Exception {
+  public void testSecondarySortAvros() throws Exception {
+    runSecondarySort(AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testSecondarySortWritables() throws Exception {
+    runSecondarySort(WritableTypeFamily.getInstance());
+  }
+
+  public void runSecondarySort(PTypeFamily ptf) throws Exception {
     Pipeline p = new MRPipeline(SecondarySortIT.class, tempDir.getDefaultConfiguration());
     String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt");
     
@@ -50,14 +61,14 @@ public class SecondarySortIT extends CrunchTestSupport implements Serializable {
             return Pair.of(pieces[0],
                 Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim())));
           }
-        }, tableOf(strings(), pairs(ints(), ints())));
+        }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.ints(), ptf.ints())));
     Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() {
       @Override
       public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) {
         Joiner j = Joiner.on(',');
         return j.join(input.first(), j.join(input.second()));
       }
-    }, strings()).materialize();
+    }, ptf.strings()).materialize();
     assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"),
         ImmutableList.copyOf(lines));
     p.done();

http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
index 9677fc1..9d16821 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/TupleWritableComparator.java
@@ -17,37 +17,26 @@
  */
 package org.apache.crunch.lib.sort;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 
 import com.google.common.collect.Lists;
-import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.lib.Sort.ColumnOrder;
 import org.apache.crunch.lib.Sort.Order;
 import org.apache.crunch.types.writable.TupleWritable;
 import org.apache.crunch.types.writable.WritableType;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 
-import com.google.common.base.Function;
 import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.io.WritableFactories;
 
 public class TupleWritableComparator extends WritableComparator implements Configurable {
 
   private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
 
   private Configuration conf;
-  Writable[] w1;
-  Writable[] w2;
   private ColumnOrder[] columnOrders;
 
   public TupleWritableComparator() {
@@ -57,9 +46,7 @@ public class TupleWritableComparator extends WritableComparator implements Confi
   public static void configureOrdering(Configuration conf, WritableType[] types, ColumnOrder[] columnOrders) {
     List<String> ordering = Lists.newArrayList();
     for (int i = 0; i < types.length; i++) {
-      Class<?> cls = types[i].getSerializationClass();
-      String order = columnOrders[i].order().name();
-      ordering.add(cls.getCanonicalName() + ";" + order);
+      ordering.add(columnOrders[i].order().name());
     }
     conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(ordering));
   }
@@ -84,22 +71,16 @@ public class TupleWritableComparator extends WritableComparator implements Confi
       } else if (!ta.has(index) && tb.has(index)) {
         return -order;
       } else {
-        BytesWritable v1 = ta.get(index);
-        BytesWritable v2 = tb.get(index);
+        Writable v1 = ta.get(index);
+        Writable v2 = tb.get(index);
         if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-          try {
-            w1[index].readFields(new DataInputStream(new ByteArrayInputStream(v1.getBytes())));
-            w2[index].readFields(new DataInputStream(new ByteArrayInputStream(v2.getBytes())));
-          } catch (IOException e) {
-            throw new CrunchRuntimeException(e);
-          }
-          if (w1[index] instanceof WritableComparable && w2[index] instanceof WritableComparable) {
-            int cmp = ((WritableComparable) w1[index]).compareTo((WritableComparable) w2[index]);
+          if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+            int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
             if (cmp != 0) {
               return order * cmp;
             }
           } else {
-            int cmp = w1[index].hashCode() - w2[index].hashCode();
+            int cmp = v1.hashCode() - v2.hashCode();
             if (cmp != 0) {
               return order * cmp;
             }
@@ -122,19 +103,8 @@ public class TupleWritableComparator extends WritableComparator implements Confi
       String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
       String[] columnOrderNames = ordering.split(",");
       columnOrders = new ColumnOrder[columnOrderNames.length];
-      w1 = new Writable[columnOrderNames.length];
-      w2 = new Writable[columnOrderNames.length];
       for (int i = 0; i < columnOrders.length; i++) {
-        String[] split = columnOrderNames[i].split(";");
-        String className = split[0];
-        try {
-          Class cls = Class.forName(className);
-          w1[i] = WritableFactories.newInstance(cls);
-          w2[i] = WritableFactories.newInstance(cls);
-        } catch (Exception e) {
-          throw new CrunchRuntimeException(e);
-        }
-        Order order = Order.valueOf(split[1]);
+        Order order = Order.valueOf(columnOrderNames[i]);
         columnOrders[i] = ColumnOrder.by(i + 1, order);
       }
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index 251e4f5..1362132 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -17,16 +17,16 @@
  */
 package org.apache.crunch.types.writable;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
+import java.util.Arrays;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.CrunchRuntimeException;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableFactories;
@@ -37,42 +37,64 @@ import org.apache.hadoop.io.WritableUtils;
  * added here because of its package visibility restrictions.
  * 
  */
-public class TupleWritable implements WritableComparable<TupleWritable> {
+public class TupleWritable extends Configured implements WritableComparable<TupleWritable> {
+
+  private int[] written;
+  private Writable[] values;
 
-  private long written;
-  private BytesWritable[] values;
-  private List<Class<Writable>> writableClasses;
-  
   /**
    * Create an empty tuple with no allocated storage for writables.
    */
   public TupleWritable() {
   }
 
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf == null) return;
+
+    try {
+      Writables.reloadWritableComparableCodes(conf);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error reloading writable comparable codes", e);
+    }
+  }
+
+  private static int[] getCodes(Writable[] writables) {
+    int[] b = new int[writables.length];
+    for (int i = 0; i < b.length; i++) {
+      if (writables[i] != null) {
+        b[i] = getCode(writables[i].getClass());
+      }
+    }
+    return b;
+  }
+
+  public TupleWritable(Writable[] values) {
+    this(values, getCodes(values));
+  }
+
   /**
    * Initialize tuple with storage; unknown whether any of them contain
    * &quot;written&quot; values.
    */
-  public TupleWritable(BytesWritable[] vals) {
-    written = 0L;
-    values = vals;
+  public TupleWritable(Writable[] values, int[] written) {
+    Preconditions.checkArgument(values.length == written.length);
+    this.written = written;
+    this.values = values;
   }
 
-  public void setWritableClasses(List<Class<Writable>> writableClasses) {
-    this.writableClasses = writableClasses;
-  }
-  
   /**
    * Return true if tuple has an element at the position provided.
    */
   public boolean has(int i) {
-    return 0 != ((1 << i) & written);
+    return written[i] != 0;
   }
 
   /**
    * Get ith Writable from Tuple.
    */
-  public BytesWritable get(int i) {
+  public Writable get(int i) {
     return values[i];
   }
 
@@ -89,13 +111,13 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
   public boolean equals(Object other) {
     if (other instanceof TupleWritable) {
       TupleWritable that = (TupleWritable) other;
-      if (this.size() != that.size() || this.written != that.written) {
+      if (this.size() != that.size()) {
         return false;
       }
       for (int i = 0; i < values.length; ++i) {
         if (!has(i))
           continue;
-        if (!values[i].equals(that.get(i))) {
+        if (written[i] != that.written[i] || !values[i].equals(that.values[i])) {
           return false;
         }
       }
@@ -121,17 +143,7 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
     StringBuffer buf = new StringBuffer("[");
     for (int i = 0; i < values.length; ++i) {
       if (has(i)) {
-        if (writableClasses != null) {
-          Writable w = WritableFactories.newInstance(writableClasses.get(i));
-          try {
-            w.readFields(new DataInputStream(new ByteArrayInputStream(values[i].getBytes())));
-          } catch (IOException e) {
-            throw new CrunchRuntimeException(e);
-          }
-          buf.append(w.toString());
-        } else {
-          buf.append(values[i].toString());
-        }
+        buf.append(values[i].toString());
       }
       buf.append(",");
     }
@@ -142,6 +154,15 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
     return buf.toString();
   }
 
+  public void clear() {
+    Arrays.fill(written, (byte) 0);
+  }
+
+  public void set(int index, Writable w) {
+    written[index] = getCode(w.getClass());
+    values[index] = w;
+  }
+
   /**
    * Writes each Writable to <code>out</code>. TupleWritable format:
    * {@code
@@ -150,9 +171,9 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
    */
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVInt(out, values.length);
-    WritableUtils.writeVLong(out, written);
     for (int i = 0; i < values.length; ++i) {
-      if (has(i)) {
+      WritableUtils.writeVInt(out, written[i]);
+      if (written[i] != 0) {
         values[i].write(out);
       }
     }
@@ -163,36 +184,32 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
    */
   public void readFields(DataInput in) throws IOException {
     int card = WritableUtils.readVInt(in);
-    values = new BytesWritable[card];
-    written = WritableUtils.readVLong(in);
+    values = new Writable[card];
+    written = new int[card];
     for (int i = 0; i < card; ++i) {
-      if (has(i)) {
-        values[i] = new BytesWritable();
+      written[i] = WritableUtils.readVInt(in);
+      if (written[i] != 0) {
+        values[i] = getWritable(written[i], getConf());
         values[i].readFields(in);
       }
     }
   }
 
-  /**
-   * Record that the tuple contains an element at the position provided.
-   */
-  public void setWritten(int i) {
-    written |= 1 << i;
-  }
-
-  /**
-   * Record that the tuple does not contain an element at the position provided.
-   */
-  public void clearWritten(int i) {
-    written &= -1 ^ (1 << i);
+  static int getCode(Class<? extends Writable> clazz) {
+    if (Writables.WRITABLE_CODES.inverse().containsKey(clazz)) {
+      return Writables.WRITABLE_CODES.inverse().get(clazz);
+    } else {
+      return 1; // default for BytesWritable
+    }
   }
 
-  /**
-   * Clear any record of which writables have been written to, without releasing
-   * storage.
-   */
-  public void clearWritten() {
-    written = 0L;
+  static Writable getWritable(int code, Configuration conf) {
+    Class<? extends Writable> clazz = Writables.WRITABLE_CODES.get(code);
+    if (clazz != null) {
+      return WritableFactories.newInstance(clazz, conf);
+    } else {
+      throw new IllegalStateException("Unknown Writable code: " + code);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index d8ad6ca..a121ae3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -18,13 +18,23 @@
 package org.apache.crunch.types.writable;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableBiMap;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
@@ -49,6 +59,7 @@ import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
@@ -63,6 +74,78 @@ import com.google.common.collect.Maps;
  * 
  */
 public class Writables {
+
+  private static final Log LOG = LogFactory.getLog(Writables.class);
+
+  static BiMap<Integer, Class<? extends Writable>> WRITABLE_CODES = HashBiMap.create(ImmutableBiMap.<Integer, Class<? extends Writable>>builder()
+          .put(1, BytesWritable.class)
+          .put(2, Text.class)
+          .put(3, IntWritable.class)
+          .put(4, LongWritable.class)
+          .put(5, FloatWritable.class)
+          .put(6, DoubleWritable.class)
+          .put(7, BooleanWritable.class)
+          .put(8, TupleWritable.class)
+          .put(9, TextMapWritable.class)
+          .put(10, UnionWritable.class)
+          .build());
+
+  /**
+   * Registers a {@code WritableComparable} class so that it can be used for comparing the fields inside of
+   * tuple types (e.g., {@code pairs}, {@code trips}, {@code tupleN}, etc.) for use in sorts and
+   * secondary sorts.
+   *
+   * @param clazz The WritableComparable class to register
+   * @return the integer code that was assigned to serialized instances of this class
+   */
+  public static void registerComparable(Class<? extends WritableComparable> clazz) {
+    int code = clazz.hashCode();
+    if (code < 0) {
+      code = -code;
+    }
+    if (code < WRITABLE_CODES.size()) {
+      code += WRITABLE_CODES.size();
+    }
+    registerComparable(clazz, code);
+  }
+
+  /**
+   * Registers a {@code WritableComparable} class with a given integer code to use for serializing
+   * and deserializing instances of this class that are defined inside of tuple types (e.g., {@code pairs},
+   * {@code trips}, {@code tupleN}, etc.) Unregistered Writables are always serialized to bytes and
+   * cannot be used in comparisons (e.g., sorts and secondary sorts) according to their underlying types.
+   *
+   * @param clazz The class to register
+   * @param code  The unique registration code for the class, which must be greater than or equal to 8
+   */
+  public static void registerComparable(Class<? extends WritableComparable> clazz, int code) {
+    if (WRITABLE_CODES.containsKey(code)) {
+      throw new IllegalArgumentException("Already have writable class assigned to code = " + code);
+    }
+  }
+
+  private static final String WRITABLE_COMPARABLE_CODES = "crunch.writable.comparable.codes";
+
+  private static void serializeWritableComparableCodes(Configuration conf) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(WRITABLE_CODES);
+    oos.close();
+    conf.set(WRITABLE_COMPARABLE_CODES, Base64.encodeBase64String(baos.toByteArray()));
+  }
+
+  static void reloadWritableComparableCodes(Configuration conf) throws Exception {
+    if (conf.get(WRITABLE_COMPARABLE_CODES) != null) {
+      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(conf.get(WRITABLE_COMPARABLE_CODES)));
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      BiMap<Integer, Class<? extends Writable>> codes = (BiMap<Integer, Class<? extends Writable>>) ois.readObject();
+      ois.close();
+      for (Map.Entry<Integer, Class<? extends Writable>> e : codes.entrySet()) {
+        WRITABLE_CODES.put(e.getKey(), e.getValue());
+      }
+    }
+  }
+
   private static final MapFn<NullWritable, Void> NULL_WRITABLE_TO_VOID = new MapFn<NullWritable, Void>() {
     @Override
     public Void map(NullWritable input) {
@@ -280,14 +363,19 @@ public class Writables {
     return new WritableTableType((WritableType) key, (WritableType) value);
   }
 
-  private static <W extends Writable> W create(Class<W> clazz, BytesWritable bytes) {
-    W instance = (W) WritableFactories.newInstance(clazz);
-    try {
-      instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes())));
-    } catch (IOException e) {
-      throw new CrunchRuntimeException(e);
+  private static <W extends Writable> W create(Class<W> clazz, Writable writable) {
+    if (clazz.equals(writable.getClass())) {
+      return (W) writable;
+    } else {
+      W instance = (W) WritableFactories.newInstance(clazz);
+      BytesWritable bytes = (BytesWritable) writable;
+      try {
+        instance.readFields(new DataInputStream(new ByteArrayInputStream(bytes.getBytes())));
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
+      }
+      return instance;
     }
-    return instance;
   }
 
   /**
@@ -307,12 +395,26 @@ public class Writables {
       this.writableClasses = Lists.newArrayList();
       for (WritableType ptype : ptypes) {
         fns.add(ptype.getInputMapFn());
-        writableClasses.add(ptype.getSerializationClass());
+        Class<Writable> clazz = ptype.getSerializationClass();
+        if (WritableComparable.class.isAssignableFrom(clazz)) {
+          if (!WRITABLE_CODES.inverse().containsKey(clazz)) {
+            LOG.warn(String.format(
+                "WritableComparable class %s in tuple type should be registered with Writables.registerComparable",
+                clazz.toString()));
+          }
+        }
+        writableClasses.add(clazz);
       }
     }
 
     @Override
     public void configure(Configuration conf) {
+      try {
+        serializeWritableComparableCodes(conf);
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error serializing writable comparable codes", e);
+      }
+
       for (MapFn fn : fns) {
         fn.configure(conf);
       }
@@ -327,9 +429,11 @@ public class Writables {
     
     @Override
     public void initialize() {
+
       for (MapFn fn : fns) {
         fn.initialize();
       }
+
       // The rest of the methods allocate new
       // objects each time. However this one
       // uses Tuple.tuplify which does a copy
@@ -357,23 +461,28 @@ public class Writables {
    */
   private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> {
 
-    private transient TupleWritable writable;
-    private transient BytesWritable[] values;
-
     private final List<MapFn> fns;
-    private final List<Class<Writable>> writableClasses;
-    
+
+    private transient int[] written;
+    private transient Writable[] values;
+
     public TupleTWMapFn(PType<?>... ptypes) {
       this.fns = Lists.newArrayList();
-      this.writableClasses = Lists.newArrayList();
       for (PType<?> ptype : ptypes) {
         fns.add(ptype.getOutputMapFn());
-        writableClasses.add(((WritableType) ptype).getSerializationClass());
       }
+
+      this.written = new int[fns.size()];
+      this.values = new Writable[fns.size()];
     }
 
     @Override
     public void configure(Configuration conf) {
+      try {
+        serializeWritableComparableCodes(conf);
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error serializing writable comparable codes", e);
+      }
       for (MapFn fn : fns) {
         fn.configure(conf);
       }
@@ -388,26 +497,31 @@ public class Writables {
     
     @Override
     public void initialize() {
-      this.values = new BytesWritable[fns.size()];
-      this.writable = new TupleWritable(values);
-      this.writable.setWritableClasses(writableClasses);
       for (MapFn fn : fns) {
         fn.initialize();
       }
+      this.written = new int[fns.size()];
+      this.values = new Writable[fns.size()];
     }
 
     @Override
     public TupleWritable map(Tuple input) {
-      writable.clearWritten();
+      Arrays.fill(written, (byte) 0);
+      Arrays.fill(values, null);
       for (int i = 0; i < input.size(); i++) {
         Object value = input.get(i);
         if (value != null) {
-          writable.setWritten(i);
           Writable w = (Writable) fns.get(i).map(value);
-          values[i] = new BytesWritable(WritableUtils.toByteArray(w));
+          if (WRITABLE_CODES.inverse().containsKey(w.getClass())) {
+            values[i] = w;
+            written[i] = WRITABLE_CODES.inverse().get(w.getClass());
+          } else {
+            values[i] = new BytesWritable(WritableUtils.toByteArray(w));
+            written[i] = 1; // code for BytesWritable
+          }
         }
       }
-      return writable;
+      return new TupleWritable(values, written);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
index e8727c3..aee185a 100644
--- a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -24,6 +24,7 @@ import org.apache.crunch.types.writable.TupleWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,7 +42,7 @@ public class TupleWritablePartitionerTest {
   public void testGetPartition() {
     IntWritable intWritable = new IntWritable(3);
     BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(intWritable));
-    TupleWritable key = new TupleWritable(new BytesWritable[] { bw });
+    TupleWritable key = new TupleWritable(new Writable[] { bw });
     assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
     assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1a160b65/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
index b1f4107..3a6fc18 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -19,7 +19,6 @@ package org.apache.crunch.types.writable;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 
 import java.io.DataInput;
@@ -123,13 +122,7 @@ public class WritablesTest {
   public void testPairs() throws Exception {
     Pair<String, String> j = Pair.of("a", "b");
     Text[] t = new Text[] { new Text("a"), new Text("b"), };
-    BytesWritable[] b = new BytesWritable[t.length];
-    for (int i = 0; i < t.length; i++) {
-      b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
-    }
-    TupleWritable w = new TupleWritable(b);
-    w.setWritten(0);
-    w.setWritten(1);
+    TupleWritable w = new TupleWritable(t);
     testInputOutputFn(Writables.pairs(Writables.strings(), Writables.strings()), j, w);
   }
 
@@ -153,14 +146,7 @@ public class WritablesTest {
   public void testTriples() throws Exception {
     Tuple3 j = Tuple3.of("a", "b", "c");
     Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), };
-    BytesWritable[] b = new BytesWritable[t.length];
-    for (int i = 0; i < t.length; i++) {
-      b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
-    }
-    TupleWritable w = new TupleWritable(b);
-    w.setWritten(0);
-    w.setWritten(1);
-    w.setWritten(2);
+    TupleWritable w = new TupleWritable(t);
     WritableType<?, ?> wt = Writables.triples(Writables.strings(), Writables.strings(), Writables.strings());
     testInputOutputFn(wt, j, w);
   }
@@ -170,15 +156,7 @@ public class WritablesTest {
   public void testQuads() throws Exception {
     Tuple4 j = Tuple4.of("a", "b", "c", "d");
     Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"), };
-    BytesWritable[] b = new BytesWritable[t.length];
-    for (int i = 0; i < t.length; i++) {
-      b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
-    }
-    TupleWritable w = new TupleWritable(b);
-    w.setWritten(0);
-    w.setWritten(1);
-    w.setWritten(2);
-    w.setWritten(3);
+    TupleWritable w = new TupleWritable(t);
     WritableType<?, ?> wt = Writables.quads(Writables.strings(), Writables.strings(), Writables.strings(),
         Writables.strings());
     testInputOutputFn(wt, j, w);
@@ -189,16 +167,7 @@ public class WritablesTest {
     TupleN j = new TupleN("a", "b", "c", "d", "e");
     Text[] t = new Text[] { new Text("a"), new Text("b"), new Text("c"), new Text("d"),
         new Text("e"), };
-    BytesWritable[] b = new BytesWritable[t.length];
-    for (int i = 0; i < t.length; i++) {
-      b[i] = new BytesWritable(WritableUtils.toByteArray(t[i]));
-    }
-    TupleWritable w = new TupleWritable(b);
-    w.setWritten(0);
-    w.setWritten(1);
-    w.setWritten(2);
-    w.setWritten(3);
-    w.setWritten(4);
+    TupleWritable w = new TupleWritable(t);
     WritableType<?, ?> wt = Writables.tuples(Writables.strings(), Writables.strings(), Writables.strings(),
         Writables.strings(), Writables.strings());
     testInputOutputFn(wt, j, w);