You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:07 UTC

[05/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
deleted file mode 100644
index fc30eaf..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ /dev/null
@@ -1,709 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.util.Utf8;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.fn.CompositeMapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.types.CollectionDeepCopier;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.crunch.types.MapDeepCopier;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypes;
-import org.apache.crunch.types.TupleDeepCopier;
-import org.apache.crunch.types.TupleFactory;
-import org.apache.crunch.types.writable.WritableDeepCopier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Defines static methods that are analogous to the methods defined in
- * {@link AvroTypeFamily} for convenient static importing.
- * 
- */
-public class Avros {
-
-  /**
-   * Older versions of Avro (i.e., before 1.7.0) do not support schemas that are
-   * composed of a mix of specific and reflection-based schemas. This bit
-   * controls whether or not we allow Crunch jobs to be created that involve
-   * mixing specific and reflection-based schemas and can be overridden by the
-   * client developer.
-   */
-  public static final boolean CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS;
-
-  static {
-    CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS = AvroCapabilities.canDecodeSpecificSchemaWithReflectDatumReader();
-  }
-
-  /**
-   * The instance we use for generating reflected schemas. May be modified by
-   * clients (e.g., Scrunch.)
-   */
-  public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
-
-  /**
-   * The name of the configuration parameter that tracks which reflection
-   * factory to use.
-   */
-  public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
-
-  public static void configureReflectDataFactory(Configuration conf) {
-    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
-  }
-
-  public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
-    return (ReflectDataFactory) ReflectionUtils.newInstance(
-        conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
-  }
-
-  public static void checkCombiningSpecificAndReflectionSchemas() {
-    if (!CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS) {
-      throw new IllegalStateException("Crunch does not support running jobs that"
-          + " contain a mixture of reflection-based and avro-generated data types."
-          + " Please consider turning your reflection-based type into an avro-generated"
-          + " type and using that generated type instead."
-          + " If the version of Avro you are using is 1.7.0 or greater, you can enable"
-          + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS" + " field to 'true'.");
-    }
-  }
-
-  public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
-    @Override
-    public String map(CharSequence input) {
-      return input.toString();
-    }
-  };
-
-  public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
-    @Override
-    public Utf8 map(String input) {
-      return new Utf8(input);
-    }
-  };
-
-  public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
-    @Override
-    public ByteBuffer map(Object input) {
-      if (input instanceof ByteBuffer) {
-        return (ByteBuffer) input;
-      }
-      return ByteBuffer.wrap((byte[]) input);
-    }
-  };
-
-  private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
-      UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>());
-  private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
-  private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
-  private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
-  private static final AvroType<Float> floats = create(Float.class, Schema.Type.FLOAT);
-  private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
-  private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
-  private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
-      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier<ByteBuffer>());
-
-  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
-      .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
-      .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
-
-  private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
-
-  public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
-    EXTENSIONS.put(clazz, ptype);
-  }
-
-  public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
-    return (PType<T>) PRIMITIVES.get(clazz);
-  }
-
-  static <T> boolean isPrimitive(AvroType<T> avroType) {
-    return avroType.getTypeClass().isPrimitive() || PRIMITIVES.containsKey(avroType.getTypeClass());
-  }
-
-  private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
-    return new AvroType<T>(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier<T>());
-  }
-
-  public static final AvroType<Void> nulls() {
-    return nulls;
-  }
-
-  public static final AvroType<String> strings() {
-    return strings;
-  }
-
-  public static final AvroType<Long> longs() {
-    return longs;
-  }
-
-  public static final AvroType<Integer> ints() {
-    return ints;
-  }
-
-  public static final AvroType<Float> floats() {
-    return floats;
-  }
-
-  public static final AvroType<Double> doubles() {
-    return doubles;
-  }
-
-  public static final AvroType<Boolean> booleans() {
-    return booleans;
-  }
-
-  public static final AvroType<ByteBuffer> bytes() {
-    return bytes;
-  }
-
-  public static final <T> AvroType<T> records(Class<T> clazz) {
-    if (EXTENSIONS.containsKey(clazz)) {
-      return (AvroType<T>) EXTENSIONS.get(clazz);
-    }
-    return containers(clazz);
-  }
-
-  public static final AvroType<GenericData.Record> generics(Schema schema) {
-    return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(
-        schema));
-  }
-
-  public static final <T> AvroType<T> containers(Class<T> clazz) {
-    if (SpecificRecord.class.isAssignableFrom(clazz)) {
-      return (AvroType<T>) specifics((Class<SpecificRecord>) clazz);
-    }
-    return reflects(clazz);
-  }
-
-  public static final <T extends SpecificRecord> AvroType<T> specifics(Class<T> clazz) {
-    T t = ReflectionUtils.newInstance(clazz, null);
-    Schema schema = t.getSchema();
-    return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(clazz, schema));
-  }
-
-  public static final <T> AvroType<T> reflects(Class<T> clazz) {
-    Schema schema = REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz);
-    return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
-  }
-
-  private static class BytesToWritableMapFn<T extends Writable> extends MapFn<Object, T> {
-    private static final Log LOG = LogFactory.getLog(BytesToWritableMapFn.class);
-
-    private final Class<T> writableClazz;
-
-    public BytesToWritableMapFn(Class<T> writableClazz) {
-      this.writableClazz = writableClazz;
-    }
-
-    @Override
-    public T map(Object input) {
-      ByteBuffer byteBuffer = BYTES_IN.map(input);
-      T instance = ReflectionUtils.newInstance(writableClazz, null);
-      try {
-        instance.readFields(new DataInputStream(new ByteArrayInputStream(byteBuffer.array(),
-            byteBuffer.arrayOffset(), byteBuffer.limit())));
-      } catch (IOException e) {
-        LOG.error("Exception thrown reading instance of: " + writableClazz, e);
-      }
-      return instance;
-    }
-  }
-
-  private static class WritableToBytesMapFn<T extends Writable> extends MapFn<T, ByteBuffer> {
-    private static final Log LOG = LogFactory.getLog(WritableToBytesMapFn.class);
-
-    @Override
-    public ByteBuffer map(T input) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      DataOutputStream das = new DataOutputStream(baos);
-      try {
-        input.write(das);
-      } catch (IOException e) {
-        LOG.error("Exception thrown converting Writable to bytes", e);
-      }
-      return ByteBuffer.wrap(baos.toByteArray());
-    }
-  }
-
-  public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
-    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
-        new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz));
-  }
-
-  private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
-
-    private final MapFn<Object, T> mapFn;
-
-    public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      mapFn.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      mapFn.initialize();
-    }
-
-    @Override
-    public Collection<T> map(Object input) {
-      Collection<T> ret = Lists.newArrayList();
-      if (input instanceof Collection) {
-        for (Object in : (Collection<Object>) input) {
-          ret.add(mapFn.map(in));
-        }
-      } else {
-        // Assume it is an array
-        Object[] arr = (Object[]) input;
-        for (Object in : arr) {
-          ret.add(mapFn.map(in));
-        }
-      }
-      return ret;
-    }
-  }
-
-  private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
-
-    private final MapFn mapFn;
-    private final String jsonSchema;
-    private transient Schema schema;
-
-    public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
-      this.mapFn = mapFn;
-      this.jsonSchema = schema.toString();
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      mapFn.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      mapFn.initialize();
-    }
-
-    @Override
-    public GenericData.Array<?> map(Collection<?> input) {
-      if (schema == null) {
-        schema = new Schema.Parser().parse(jsonSchema);
-      }
-      GenericData.Array array = new GenericData.Array(input.size(), schema);
-      for (Object in : input) {
-        array.add(mapFn.map(in));
-      }
-      return array;
-    }
-  }
-
-  public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
-    AvroType<T> avroType = (AvroType<T>) ptype;
-    Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
-    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
-    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
-    return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype), ptype);
-  }
-
-  private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
-    private final MapFn<Object, T> mapFn;
-
-    public AvroMapToMap(MapFn<Object, T> mapFn) {
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      mapFn.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      mapFn.initialize();
-    }
-
-    @Override
-    public Map<String, T> map(Map<CharSequence, Object> input) {
-      Map<String, T> out = Maps.newHashMap();
-      for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
-        out.put(e.getKey().toString(), mapFn.map(e.getValue()));
-      }
-      return out;
-    }
-  }
-
-  private static class MapToAvroMap<T> extends MapFn<Map<String, T>, Map<Utf8, Object>> {
-    private final MapFn<T, Object> mapFn;
-
-    public MapToAvroMap(MapFn<T, Object> mapFn) {
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      mapFn.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      this.mapFn.initialize();
-    }
-
-    @Override
-    public Map<Utf8, Object> map(Map<String, T> input) {
-      Map<Utf8, Object> out = Maps.newHashMap();
-      for (Map.Entry<String, T> e : input.entrySet()) {
-        out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
-      }
-      return out;
-    }
-  }
-
-  public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
-    AvroType<T> avroType = (AvroType<T>) ptype;
-    Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
-    AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
-    MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
-    return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype), ptype);
-  }
-
-  private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
-    private final TupleFactory<?> tupleFactory;
-    private final List<MapFn> fns;
-
-    private transient Object[] values;
-
-    public GenericRecordToTuple(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
-      this.tupleFactory = tupleFactory;
-      this.fns = Lists.newArrayList();
-      for (PType<?> ptype : ptypes) {
-        AvroType atype = (AvroType) ptype;
-        fns.add(atype.getInputMapFn());
-      }
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      for (MapFn fn : fns) {
-        fn.setContext(context);
-      }
-    }
-    
-    @Override
-    public void initialize() {
-      for (MapFn fn : fns) {
-        fn.initialize();
-      }
-      this.values = new Object[fns.size()];
-      tupleFactory.initialize();
-    }
-
-    @Override
-    public Tuple map(GenericRecord input) {
-      for (int i = 0; i < values.length; i++) {
-        Object v = input.get(i);
-        if (v == null) {
-          values[i] = null;
-        } else {
-          values[i] = fns.get(i).map(v);
-        }
-      }
-      return tupleFactory.makeTuple(values);
-    }
-  }
-
-  private static class TupleToGenericRecord extends MapFn<Tuple, GenericRecord> {
-    private final List<MapFn> fns;
-    private final List<AvroType> avroTypes;
-    private final String jsonSchema;
-    private final boolean isReflect;
-    private transient Schema schema;
-
-    public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
-      this.fns = Lists.newArrayList();
-      this.avroTypes = Lists.newArrayList();
-      this.jsonSchema = schema.toString();
-      boolean reflectFound = false;
-      boolean specificFound = false;
-      for (PType ptype : ptypes) {
-        AvroType atype = (AvroType) ptype;
-        fns.add(atype.getOutputMapFn());
-        avroTypes.add(atype);
-        if (atype.hasReflect()) {
-          reflectFound = true;
-        }
-        if (atype.hasSpecific()) {
-          specificFound = true;
-        }
-      }
-      if (specificFound && reflectFound) {
-        checkCombiningSpecificAndReflectionSchemas();
-      }
-      this.isReflect = reflectFound;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.configure(conf);
-      }
-    }
- 
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      for (MapFn fn : fns) {
-        fn.setContext(getContext());
-      }
-    }
-    
-    @Override
-    public void initialize() {
-      this.schema = new Schema.Parser().parse(jsonSchema);
-      for (MapFn fn : fns) {
-        fn.initialize();
-      }
-    }
-
-    private GenericRecord createRecord() {
-      if (isReflect) {
-        return new ReflectGenericRecord(schema);
-      } else {
-        return new GenericData.Record(schema);
-      }
-    }
-
-    @Override
-    public GenericRecord map(Tuple input) {
-      GenericRecord record = createRecord();
-      for (int i = 0; i < input.size(); i++) {
-        Object v = input.get(i);
-        if (v == null) {
-          record.put(i, null);
-        } else {
-          record.put(i, fns.get(i).map(v));
-        }
-      }
-      return record;
-    }
-  }
-
-  public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
-    Schema schema = createTupleSchema(p1, p2);
-    GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
-    TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
-    return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), p1, p2);
-  }
-
-  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
-    Schema schema = createTupleSchema(p1, p2, p3);
-    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
-        new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), p1, p2, p3);
-  }
-
-  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
-      PType<V4> p4) {
-    Schema schema = createTupleSchema(p1, p2, p3, p4);
-    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
-        new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), p1, p2,
-        p3, p4);
-  }
-
-  public static final AvroType<TupleN> tuples(PType... ptypes) {
-    Schema schema = createTupleSchema(ptypes);
-    return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
-        new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), ptypes);
-  }
-
-  public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
-    Schema schema = createTupleSchema(ptypes);
-    Class[] typeArgs = new Class[ptypes.length];
-    for (int i = 0; i < typeArgs.length; i++) {
-      typeArgs[i] = ptypes[i].getTypeClass();
-    }
-    TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
-        ptypes), new TupleDeepCopier(clazz, ptypes), ptypes);
-  }
-
-  private static Schema createTupleSchema(PType<?>... ptypes) {
-    // Guarantee each tuple schema has a globally unique name
-    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
-    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
-    List<Schema.Field> fields = Lists.newArrayList();
-    for (int i = 0; i < ptypes.length; i++) {
-      AvroType atype = (AvroType) ptypes[i];
-      Schema fieldSchema = allowNulls(atype.getSchema());
-      fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
-    }
-    schema.setFields(fields);
-    return schema;
-  }
-
-  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
-      PType<S> base) {
-    AvroType<S> abase = (AvroType<S>) base;
-    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
-        new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(), base.getSubTypes()
-            .toArray(new PType[0]));
-  }
-
-  public static <T> PType<T> jsons(Class<T> clazz) {
-    return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
-  }
-
-  public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key, PType<V> value) {
-    if (key instanceof PTableType) {
-      PTableType ptt = (PTableType) key;
-      key = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
-    }
-    if (value instanceof PTableType) {
-      PTableType ptt = (PTableType) value;
-      value = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
-    }
-    AvroType<K> avroKey = (AvroType<K>) key;
-    AvroType<V> avroValue = (AvroType<V>) value;
-    return new AvroTableType(avroKey, avroValue, Pair.class);
-  }
-
-  private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
-
-  private static Schema allowNulls(Schema base) {
-    if (NULL_SCHEMA.equals(base)) {
-      return base;
-    }
-    return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
-  }
-
-  private static class ReflectGenericRecord extends GenericData.Record {
-
-    public ReflectGenericRecord(Schema schema) {
-      super(schema);
-    }
-
-    @Override
-    public int hashCode() {
-      return reflectAwareHashCode(this, getSchema());
-    }
-  }
-
-  /*
-   * TODO: Remove this once we no longer have to support 1.5.4.
-   */
-  private static int reflectAwareHashCode(Object o, Schema s) {
-    if (o == null)
-      return 0; // incomplete datum
-    int hashCode = 1;
-    switch (s.getType()) {
-    case RECORD:
-      for (Schema.Field f : s.getFields()) {
-        if (f.order() == Schema.Field.Order.IGNORE)
-          continue;
-        hashCode = hashCodeAdd(hashCode, ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
-      }
-      return hashCode;
-    case ARRAY:
-      Collection<?> a = (Collection<?>) o;
-      Schema elementType = s.getElementType();
-      for (Object e : a)
-        hashCode = hashCodeAdd(hashCode, e, elementType);
-      return hashCode;
-    case UNION:
-      return reflectAwareHashCode(o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
-    case ENUM:
-      return s.getEnumOrdinal(o.toString());
-    case NULL:
-      return 0;
-    case STRING:
-      return (o instanceof Utf8 ? o : new Utf8(o.toString())).hashCode();
-    default:
-      return o.hashCode();
-    }
-  }
-
-  /** Add the hash code for an object into an accumulated hash code. */
-  private static int hashCodeAdd(int hashCode, Object o, Schema s) {
-    return 31 * hashCode + reflectAwareHashCode(o, s);
-  }
-
-  private Avros() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
deleted file mode 100644
index e973cca..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-/**
- * A Factory class for constructing Avro reflection-related objects.
- */
-public class ReflectDataFactory {
-
-  public ReflectData getReflectData() {
-    return ReflectData.AllowNull.get();
-  }
-
-  public <T> ReflectDatumReader<T> getReader(Schema schema) {
-    return new ReflectDatumReader<T>(schema);
-  }
-
-  public <T> ReflectDatumWriter<T> getWriter(Schema schema) {
-    return new ReflectDatumWriter<T>(schema);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
deleted file mode 100644
index 8bd18b0..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.mapred.Pair;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
-class SafeAvroSerialization<T> extends Configured implements Serialization<AvroWrapper<T>> {
-
-  public boolean accept(Class<?> c) {
-    return AvroWrapper.class.isAssignableFrom(c);
-  }
-
-  /**
-   * Returns the specified map output deserializer. Defaults to the final output
-   * deserializer if no map output schema was specified.
-   */
-  public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
-    boolean isKey = AvroKey.class.isAssignableFrom(c);
-    Configuration conf = getConf();
-    Schema schema = isKey ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob
-        .getMapOutputSchema(conf));
-
-    DatumReader<T> datumReader = null;
-    if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
-      ReflectDataFactory factory = (ReflectDataFactory) ReflectionUtils.newInstance(
-          conf.getClass("crunch.reflectdatafactory", ReflectDataFactory.class), conf);
-      datumReader = factory.getReader(schema);
-    } else {
-      datumReader = new SpecificDatumReader<T>(schema);
-    }
-    return new AvroWrapperDeserializer(datumReader, isKey);
-  }
-
-  private static final DecoderFactory FACTORY = DecoderFactory.get();
-
-  private class AvroWrapperDeserializer implements Deserializer<AvroWrapper<T>> {
-
-    private DatumReader<T> reader;
-    private BinaryDecoder decoder;
-    private boolean isKey;
-
-    public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) {
-      this.reader = reader;
-      this.isKey = isKey;
-    }
-
-    public void open(InputStream in) {
-      this.decoder = FACTORY.directBinaryDecoder(in, decoder);
-    }
-
-    public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper) throws IOException {
-      T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
-      if (wrapper == null) {
-        wrapper = isKey ? new AvroKey<T>(datum) : new AvroValue<T>(datum);
-      } else {
-        wrapper.datum(datum);
-      }
-      return wrapper;
-    }
-
-    public void close() throws IOException {
-      decoder.inputStream().close();
-    }
-  }
-
-  /** Returns the specified output serializer. */
-  public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
-    // AvroWrapper used for final output, AvroKey or AvroValue for map output
-    boolean isFinalOutput = c.equals(AvroWrapper.class);
-    Configuration conf = getConf();
-    Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair
-        .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
-
-    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    ReflectDatumWriter<T> writer = factory.getWriter(schema);
-    return new AvroWrapperSerializer(writer);
-  }
-
-  private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
-    private DatumWriter<T> writer;
-    private OutputStream out;
-    private BinaryEncoder encoder;
-
-    public AvroWrapperSerializer(DatumWriter<T> writer) {
-      this.writer = writer;
-    }
-
-    public void open(OutputStream out) {
-      this.out = out;
-      this.encoder = new EncoderFactory().configureBlockSize(512).binaryEncoder(out, null);
-    }
-
-    public void serialize(AvroWrapper<T> wrapper) throws IOException {
-      writer.write(wrapper.datum(), encoder);
-      // would be a lot faster if the Serializer interface had a flush()
-      // method and the Hadoop framework called it when needed rather
-      // than for every record.
-      encoder.flush();
-    }
-
-    public void close() throws IOException {
-      out.close();
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/package-info.java b/crunch/src/main/java/org/apache/crunch/types/avro/package-info.java
deleted file mode 100644
index abaf60f..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Business object serialization using Apache Avro.
- */
-package org.apache.crunch.types.avro;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/package-info.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/package-info.java b/crunch/src/main/java/org/apache/crunch/types/package-info.java
deleted file mode 100644
index b420b03..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Common functionality for business object serialization.
- */
-package org.apache.crunch.types;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
deleted file mode 100644
index 8b54008..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * A {@link Writable} for marshalling/unmarshalling Collections. Note that
- * element order is <em>undefined</em>!
- *
- * @param <T> The value type
- */
-class GenericArrayWritable<T> implements Writable {
-  private Writable[] values;
-  private Class<? extends Writable> valueClass;
-
-  public GenericArrayWritable(Class<? extends Writable> valueClass) {
-    this.valueClass = valueClass;
-  }
-
-  public GenericArrayWritable() {
-    // for deserialization
-  }
-
-  public void set(Writable[] values) {
-    this.values = values;
-  }
-
-  public Writable[] get() {
-    return values;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    values = new Writable[WritableUtils.readVInt(in)]; // construct values
-    if (values.length > 0) {
-      int nulls = WritableUtils.readVInt(in);
-      if (nulls == values.length) {
-        return;
-      }
-      String valueType = Text.readString(in);
-      setValueType(valueType);
-      for (int i = 0; i < values.length - nulls; i++) {
-        Writable value = WritableFactories.newInstance(valueClass);
-        value.readFields(in); // read a value
-        values[i] = value; // store it in values
-      }
-    }
-  }
-
-  protected void setValueType(String valueType) {
-    if (valueClass == null) {
-      try {
-        valueClass = Class.forName(valueType).asSubclass(Writable.class);
-      } catch (ClassNotFoundException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    } else if (!valueType.equals(valueClass.getName())) {
-      throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass);
-    }
-  }
-
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVInt(out, values.length);
-    if (values.length > 0) {
-      int nulls = 0;
-      for (int i = 0; i < values.length; i++) {
-        if (values[i] == null) {
-          nulls++;
-        }
-      }
-      WritableUtils.writeVInt(out, nulls);
-      if (values.length - nulls > 0) {
-        if (valueClass == null) {
-          throw new IllegalStateException("Value class not set by constructor or read");
-        }
-        Text.writeString(out, valueClass.getName());
-        for (int i = 0; i < values.length; i++) {
-          if (values[i] != null) {
-            values[i].write(out);
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(values).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    GenericArrayWritable other = (GenericArrayWritable) obj;
-    if (!Arrays.equals(values, other.values))
-      return false;
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return Arrays.toString(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
deleted file mode 100644
index 1ab51df..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import com.google.common.collect.Maps;
-
-class TextMapWritable<T extends Writable> implements Writable {
-
-  private Class<T> valueClazz;
-  private final Map<Text, T> instance;
-
-  public TextMapWritable() {
-    this.instance = Maps.newHashMap();
-  }
-
-  public TextMapWritable(Class<T> valueClazz) {
-    this.valueClazz = valueClazz;
-    this.instance = Maps.newHashMap();
-  }
-
-  public void put(Text txt, T value) {
-    instance.put(txt, value);
-  }
-
-  public Set<Map.Entry<Text, T>> entrySet() {
-    return instance.entrySet();
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    instance.clear();
-    try {
-      this.valueClazz = (Class<T>) Class.forName(Text.readString(in));
-    } catch (ClassNotFoundException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
-    }
-    int entries = WritableUtils.readVInt(in);
-    try {
-      for (int i = 0; i < entries; i++) {
-        Text txt = new Text();
-        txt.readFields(in);
-        T value = valueClazz.newInstance();
-        value.readFields(in);
-        instance.put(txt, value);
-      }
-    } catch (IllegalAccessException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
-    } catch (InstantiationException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    Text.writeString(out, valueClazz.getName());
-    WritableUtils.writeVInt(out, instance.size());
-    for (Map.Entry<Text, T> e : instance.entrySet()) {
-      e.getKey().write(out);
-      e.getValue().write(out);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
deleted file mode 100644
index 1c3536b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * A straight copy of the TupleWritable implementation in the join package,
- * added here because of its package visibility restrictions.
- * 
- */
-public class TupleWritable implements WritableComparable<TupleWritable> {
-
-  private long written;
-  private Writable[] values;
-
-  /**
-   * Create an empty tuple with no allocated storage for writables.
-   */
-  public TupleWritable() {
-  }
-
-  /**
-   * Initialize tuple with storage; unknown whether any of them contain
-   * &quot;written&quot; values.
-   */
-  public TupleWritable(Writable[] vals) {
-    written = 0L;
-    values = vals;
-  }
-
-  /**
-   * Return true if tuple has an element at the position provided.
-   */
-  public boolean has(int i) {
-    return 0 != ((1 << i) & written);
-  }
-
-  /**
-   * Get ith Writable from Tuple.
-   */
-  public Writable get(int i) {
-    return values[i];
-  }
-
-  /**
-   * The number of children in this Tuple.
-   */
-  public int size() {
-    return values.length;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public boolean equals(Object other) {
-    if (other instanceof TupleWritable) {
-      TupleWritable that = (TupleWritable) other;
-      if (this.size() != that.size() || this.written != that.written) {
-        return false;
-      }
-      for (int i = 0; i < values.length; ++i) {
-        if (!has(i))
-          continue;
-        if (!values[i].equals(that.get(i))) {
-          return false;
-        }
-      }
-      return true;
-    }
-    return false;
-  }
-
-  public int hashCode() {
-    HashCodeBuilder builder = new HashCodeBuilder();
-    builder.append(written);
-    for (Writable v : values) {
-      builder.append(v);
-    }
-    return builder.toHashCode();
-  }
-
-  /**
-   * Convert Tuple to String as in the following.
-   * <tt>[<child1>,<child2>,...,<childn>]</tt>
-   */
-  public String toString() {
-    StringBuffer buf = new StringBuffer("[");
-    for (int i = 0; i < values.length; ++i) {
-      buf.append(has(i) ? values[i].toString() : "");
-      buf.append(",");
-    }
-    if (values.length != 0)
-      buf.setCharAt(buf.length() - 1, ']');
-    else
-      buf.append(']');
-    return buf.toString();
-  }
-
-  /**
-   * Writes each Writable to <code>out</code>. TupleWritable format:
-   * {@code
-   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
-   * }
-   */
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVInt(out, values.length);
-    WritableUtils.writeVLong(out, written);
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i)) {
-        Text.writeString(out, values[i].getClass().getName());
-      }
-    }
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i)) {
-        values[i].write(out);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @SuppressWarnings("unchecked")
-  // No static typeinfo on Tuples
-  public void readFields(DataInput in) throws IOException {
-    int card = WritableUtils.readVInt(in);
-    values = new Writable[card];
-    written = WritableUtils.readVLong(in);
-    Class<? extends Writable>[] cls = new Class[card];
-    try {
-      for (int i = 0; i < card; ++i) {
-        if (has(i)) {
-          cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
-        }
-      }
-      for (int i = 0; i < card; ++i) {
-        if (has(i)) {
-          values[i] = cls[i].newInstance();
-          values[i].readFields(in);
-        }
-      }
-    } catch (ClassNotFoundException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
-    } catch (IllegalAccessException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
-    } catch (InstantiationException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
-    }
-  }
-
-  /**
-   * Record that the tuple contains an element at the position provided.
-   */
-  public void setWritten(int i) {
-    written |= 1 << i;
-  }
-
-  /**
-   * Record that the tuple does not contain an element at the position provided.
-   */
-  public void clearWritten(int i) {
-    written &= -1 ^ (1 << i);
-  }
-
-  /**
-   * Clear any record of which writables have been written to, without releasing
-   * storage.
-   */
-  public void clearWritten() {
-    written = 0L;
-  }
-
-  @Override
-  public int compareTo(TupleWritable o) {
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i) && !o.has(i)) {
-        return 1;
-      } else if (!has(i) && o.has(i)) {
-        return -1;
-      } else {
-        Writable v1 = values[i];
-        Writable v2 = o.values[i];
-        if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-          if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
-            int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
-            if (cmp != 0) {
-              return cmp;
-            }
-          } else {
-            int cmp = v1.hashCode() - v2.hashCode();
-            if (cmp != 0) {
-              return cmp;
-            }
-          }
-        }
-      }
-    }
-    return values.length - o.values.length;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
deleted file mode 100644
index 7b6e11b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Performs deep copies of Writable values.
- * 
- * @param <T> The type of Writable that can be copied
- */
-public class WritableDeepCopier<T extends Writable> implements DeepCopier<T> {
-
-  private Class<T> writableClass;
-
-  public WritableDeepCopier(Class<T> writableClass) {
-    this.writableClass = writableClass;
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-  }
-
-  @Override
-  public T deepCopy(T source) {
-    
-    if (source == null) {
-      return null;
-    }
-    
-    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
-    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
-    T copiedValue = null;
-    try {
-      source.write(dataOut);
-      dataOut.flush();
-      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
-      DataInput dataInput = new DataInputStream(byteInStream);
-      copiedValue = writableClass.newInstance();
-      copiedValue.readFields(dataInput);
-    } catch (Exception e) {
-      throw new CrunchRuntimeException("Error while deep copying " + source, e);
-    }
-    return copiedValue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
deleted file mode 100644
index 84318d3..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
-
-  private final MapFn inputFn;
-  private final MapFn outputFn;
-  private final Converter converter;
-
-  public WritableGroupedTableType(WritableTableType<K, V> tableType) {
-    super(tableType);
-    WritableType keyType = (WritableType) tableType.getKeyType();
-    WritableType valueType = (WritableType) tableType.getValueType();
-    this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
-    this.outputFn = tableType.getOutputMapFn();
-    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
-        valueType.getSerializationClass());
-  }
-
-  @Override
-  public Class<Pair<K, Iterable<V>>> getTypeClass() {
-    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();
-  }
-
-  @Override
-  public Converter getGroupingConverter() {
-    return converter;
-  }
-
-  @Override
-  public MapFn getInputMapFn() {
-    return inputFn;
-  }
-
-  @Override
-  public MapFn getOutputMapFn() {
-    return outputFn;
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    this.tableType.initialize(conf);
-  }
-
-  @Override
-  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
-    return PTables.getGroupedDetachedValue(this, value);
-  }
-
-  @Override
-  public void configureShuffle(Job job, GroupingOptions options) {
-    if (options != null) {
-      options.configure(job);
-    }
-    WritableType keyType = (WritableType) tableType.getKeyType();
-    WritableType valueType = (WritableType) tableType.getValueType();
-    job.setMapOutputKeyClass(keyType.getSerializationClass());
-    job.setMapOutputValueClass(valueType.getSerializationClass());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
deleted file mode 100644
index 2db0238..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.Converter;
-
-class WritablePairConverter<K, V> implements Converter<K, V, Pair<K, V>, Pair<K, Iterable<V>>> {
-
-  private final Class<K> keyClass;
-  private final Class<V> valueClass;
-
-  public WritablePairConverter(Class<K> keyClass, Class<V> valueClass) {
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-  }
-
-  @Override
-  public Pair<K, V> convertInput(K key, V value) {
-    return Pair.of(key, value);
-  }
-
-  @Override
-  public K outputKey(Pair<K, V> value) {
-    return value.first();
-  }
-
-  @Override
-  public V outputValue(Pair<K, V> value) {
-    return value.second();
-  }
-
-  @Override
-  public Class<K> getKeyClass() {
-    return keyClass;
-  }
-
-  @Override
-  public Class<V> getValueClass() {
-    return valueClass;
-  }
-
-  @Override
-  public Pair<K, Iterable<V>> convertIterableInput(K key, Iterable<V> value) {
-    return Pair.of(key, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
deleted file mode 100644
index 93e0fd6..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.fn.PairMapFn;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.ImmutableList;
-
-class WritableTableType<K, V> implements PTableType<K, V> {
-
-  private final WritableType<K, Writable> keyType;
-  private final WritableType<V, Writable> valueType;
-  private final MapFn inputFn;
-  private final MapFn outputFn;
-  private final Converter converter;
-
-  public WritableTableType(WritableType<K, Writable> keyType, WritableType<V, Writable> valueType) {
-    this.keyType = keyType;
-    this.valueType = valueType;
-    this.inputFn = new PairMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
-    this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
-    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
-        valueType.getSerializationClass());
-  }
-
-  @Override
-  public Class<Pair<K, V>> getTypeClass() {
-    return (Class<Pair<K, V>>) Pair.of(null, null).getClass();
-  }
-
-  @Override
-  public List<PType> getSubTypes() {
-    return ImmutableList.<PType> of(keyType, valueType);
-  }
-
-  @Override
-  public MapFn getInputMapFn() {
-    return inputFn;
-  }
-
-  @Override
-  public MapFn getOutputMapFn() {
-    return outputFn;
-  }
-
-  @Override
-  public Converter getConverter() {
-    return converter;
-  }
-
-  @Override
-  public PTypeFamily getFamily() {
-    return WritableTypeFamily.getInstance();
-  }
-
-  public PType<K> getKeyType() {
-    return keyType;
-  }
-
-  public PType<V> getValueType() {
-    return valueType;
-  }
-
-  @Override
-  public PGroupedTableType<K, V> getGroupedTableType() {
-    return new WritableGroupedTableType<K, V>(this);
-  }
-
-  @Override
-  public ReadableSourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
-    return new SeqFileTableSourceTarget<K, V>(path, this);
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    keyType.initialize(conf);
-    valueType.initialize(conf);
-  }
-
-  @Override
-  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
-    return PTables.getDetachedValue(this, value);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null || !(obj instanceof WritableTableType)) {
-      return false;
-    }
-    WritableTableType that = (WritableTableType) obj;
-    return keyType.equals(that.keyType) && valueType.equals(that.valueType);
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(keyType).append(valueType).toHashCode();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
deleted file mode 100644
index 734946c..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.seq.SeqFileSourceTarget;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.ImmutableList;
-
-public class WritableType<T, W extends Writable> implements PType<T> {
-
-  private final Class<T> typeClass;
-  private final Class<W> writableClass;
-  private final Converter converter;
-  private final MapFn<W, T> inputFn;
-  private final MapFn<T, W> outputFn;
-  private final DeepCopier<W> deepCopier;
-  private final List<PType> subTypes;
-  private boolean initialized = false;
-
-  public WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn,
-      MapFn<T, W> outputDoFn, PType... subTypes) {
-    this.typeClass = typeClass;
-    this.writableClass = writableClass;
-    this.inputFn = inputDoFn;
-    this.outputFn = outputDoFn;
-    this.converter = new WritableValueConverter(writableClass);
-    this.deepCopier = new WritableDeepCopier<W>(writableClass);
-    this.subTypes = ImmutableList.<PType> builder().add(subTypes).build();
-  }
-
-  @Override
-  public PTypeFamily getFamily() {
-    return WritableTypeFamily.getInstance();
-  }
-
-  @Override
-  public Class<T> getTypeClass() {
-    return typeClass;
-  }
-
-  @Override
-  public Converter getConverter() {
-    return converter;
-  }
-
-  @Override
-  public MapFn getInputMapFn() {
-    return inputFn;
-  }
-
-  @Override
-  public MapFn getOutputMapFn() {
-    return outputFn;
-  }
-
-  @Override
-  public List<PType> getSubTypes() {
-    return subTypes;
-  }
-
-  public Class<W> getSerializationClass() {
-    return writableClass;
-  }
-
-  @Override
-  public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
-    return new SeqFileSourceTarget<T>(path, this);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null || !(obj instanceof WritableType)) {
-      return false;
-    }
-    WritableType wt = (WritableType) obj;
-    return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes
-        .equals(wt.subTypes));
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    this.inputFn.initialize();
-    this.outputFn.initialize();
-    for (PType subType : subTypes) {
-      subType.initialize(conf);
-    }
-    this.initialized = true;
-  }
-
-  @Override
-  public T getDetachedValue(T value) {
-    if (!initialized) {
-      throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType");
-    }
-    W writableValue = outputFn.map(value);
-    W deepCopy = this.deepCopier.deepCopy(writableValue);
-    return inputFn.map(deepCopy);
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    hcb.append(typeClass).append(writableClass).append(subTypes);
-    return hcb.toHashCode();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
deleted file mode 100644
index a94db96..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.PTypeUtils;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The {@link Writable}-based implementation of the
- * {@link org.apache.crunch.types.PTypeFamily} interface.
- */
-public class WritableTypeFamily implements PTypeFamily {
-
-  private static final WritableTypeFamily INSTANCE = new WritableTypeFamily();
-
-  public static WritableTypeFamily getInstance() {
-    return INSTANCE;
-  }
-
-  // Disallow construction
-  private WritableTypeFamily() {
-  }
-
-  public PType<Void> nulls() {
-    return Writables.nulls();
-  }
-
-  public PType<String> strings() {
-    return Writables.strings();
-  }
-
-  public PType<Long> longs() {
-    return Writables.longs();
-  }
-
-  public PType<Integer> ints() {
-    return Writables.ints();
-  }
-
-  public PType<Float> floats() {
-    return Writables.floats();
-  }
-
-  public PType<Double> doubles() {
-    return Writables.doubles();
-  }
-
-  public PType<Boolean> booleans() {
-    return Writables.booleans();
-  }
-
-  public PType<ByteBuffer> bytes() {
-    return Writables.bytes();
-  }
-
-  public <T> PType<T> records(Class<T> clazz) {
-    return Writables.records(clazz);
-  }
-
-  public <W extends Writable> PType<W> writables(Class<W> clazz) {
-    return Writables.writables(clazz);
-  }
-
-  public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
-    return Writables.tableOf(key, value);
-  }
-
-  public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
-    return Writables.pairs(p1, p2);
-  }
-
-  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
-    return Writables.triples(p1, p2, p3);
-  }
-
-  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
-    return Writables.quads(p1, p2, p3, p4);
-  }
-
-  public PType<TupleN> tuples(PType<?>... ptypes) {
-    return Writables.tuples(ptypes);
-  }
-
-  public <T> PType<Collection<T>> collections(PType<T> ptype) {
-    return Writables.collections(ptype);
-  }
-
-  public <T> PType<Map<String, T>> maps(PType<T> ptype) {
-    return Writables.maps(ptype);
-  }
-
-  @Override
-  public <T> PType<T> as(PType<T> ptype) {
-    if (ptype instanceof WritableType || ptype instanceof WritableTableType
-        || ptype instanceof WritableGroupedTableType) {
-      return ptype;
-    }
-    if (ptype instanceof PGroupedTableType) {
-      PTableType ptt = ((PGroupedTableType) ptype).getTableType();
-      return new WritableGroupedTableType((WritableTableType) as(ptt));
-    }
-    PType<T> prim = Writables.getPrimitiveType(ptype.getTypeClass());
-    if (prim != null) {
-      return prim;
-    }
-    return PTypeUtils.convert(ptype, this);
-  }
-
-  @Override
-  public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
-    return Writables.tuples(clazz, ptypes);
-  }
-
-  @Override
-  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
-    return Writables.derived(clazz, inputFn, outputFn, base);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
deleted file mode 100644
index 3670b90..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.crunch.types.Converter;
-import org.apache.hadoop.io.NullWritable;
-
-class WritableValueConverter<W> implements Converter<Object, W, W, Iterable<W>> {
-
-  private final Class<W> serializationClass;
-
-  public WritableValueConverter(Class<W> serializationClass) {
-    this.serializationClass = serializationClass;
-  }
-
-  @Override
-  public W convertInput(Object key, W value) {
-    return value;
-  }
-
-  @Override
-  public Object outputKey(W value) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public W outputValue(W value) {
-    return value;
-  }
-
-  @Override
-  public Class<Object> getKeyClass() {
-    return (Class<Object>) (Class<?>) NullWritable.class;
-  }
-
-  @Override
-  public Class<W> getValueClass() {
-    return serializationClass;
-  }
-
-  @Override
-  public Iterable<W> convertIterableInput(Object key, Iterable<W> value) {
-    return value;
-  }
-}
\ No newline at end of file