You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:08 UTC

[06/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java b/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
deleted file mode 100644
index e61b98b..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-
-/**
- * Utilities for converting between {@code PType}s from different
- * {@code PTypeFamily} implementations.
- * 
- */
-public class PTypeUtils {
-
-  public static <T> PType<T> convert(PType<T> ptype, PTypeFamily tf) {
-    if (ptype instanceof PTableType) {
-      PTableType ptt = (PTableType) ptype;
-      return tf.tableOf(tf.as(ptt.getKeyType()), tf.as(ptt.getValueType()));
-    }
-    Class<T> typeClass = ptype.getTypeClass();
-    if (Tuple.class.isAssignableFrom(typeClass)) {
-      List<PType> subTypes = ptype.getSubTypes();
-      if (Pair.class.equals(typeClass)) {
-        return tf.pairs(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)));
-      } else if (Tuple3.class.equals(typeClass)) {
-        return tf.triples(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)));
-      } else if (Tuple4.class.equals(typeClass)) {
-        return tf.quads(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)), tf.as(subTypes.get(3)));
-      } else if (TupleN.class.equals(typeClass)) {
-        PType[] newPTypes = subTypes.toArray(new PType[0]);
-        for (int i = 0; i < newPTypes.length; i++) {
-          newPTypes[i] = tf.as(subTypes.get(i));
-        }
-        return (PType<T>) tf.tuples(newPTypes);
-      }
-    }
-    if (Collection.class.isAssignableFrom(typeClass)) {
-      return tf.collections(tf.as(ptype.getSubTypes().get(0)));
-    }
-    return tf.records(typeClass);
-  }
-
-  private PTypeUtils() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypes.java b/crunch/src/main/java/org/apache/crunch/types/PTypes.java
deleted file mode 100644
index 546719c..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/PTypes.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-/**
- * Utility functions for creating common types of derived PTypes, e.g., for JSON
- * data, protocol buffers, and Thrift records.
- * 
- */
-public class PTypes {
-
-  public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
-    return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
-  }
-
-  public static PType<UUID> uuid(PTypeFamily ptf) {
-    return ptf.derived(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes());
-  }
-  
-  public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily
-        .derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings());
-  }
-
-  public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes());
-  }
-
-  public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes());
-  }
-
-  public static final <T extends Enum> PType<T> enums(final Class<T> type, PTypeFamily typeFamily) {
-    return typeFamily.derived(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(), typeFamily.strings());
-  }
-
-  public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() {
-    public BigInteger map(ByteBuffer input) {
-      return input == null ? null : new BigInteger(input.array());
-    }
-  };
-
-  public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() {
-    public ByteBuffer map(BigInteger input) {
-      return input == null ? null : ByteBuffer.wrap(input.toByteArray());
-    }
-  };
-
-  private static class JacksonInputMapFn<T> extends MapFn<String, T> {
-
-    private final Class<T> clazz;
-    private transient ObjectMapper mapper;
-
-    public JacksonInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void initialize() {
-      this.mapper = new ObjectMapper();
-    }
-
-    @Override
-    public T map(String input) {
-      try {
-        return mapper.readValue(input, clazz);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  private static class JacksonOutputMapFn<T> extends MapFn<T, String> {
-
-    private transient ObjectMapper mapper;
-
-    @Override
-    public void initialize() {
-      this.mapper = new ObjectMapper();
-    }
-
-    @Override
-    public String map(T input) {
-      try {
-        return mapper.writeValueAsString(input);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  private static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
-
-    private final Class<T> clazz;
-    private transient T instance;
-
-    public ProtoInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void initialize() {
-      this.instance = Protos.getDefaultInstance(clazz);
-    }
-
-    @Override
-    public T map(ByteBuffer bb) {
-      try {
-        return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build();
-      } catch (InvalidProtocolBufferException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  private static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> {
-
-    public ProtoOutputMapFn() {
-    }
-
-    @Override
-    public ByteBuffer map(T proto) {
-      return ByteBuffer.wrap(proto.toByteArray());
-    }
-  }
-
-  private static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> {
-
-    private final Class<T> clazz;
-    private transient T instance;
-    private transient TDeserializer deserializer;
-    private transient byte[] bytes;
-
-    public ThriftInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void initialize() {
-      this.instance = ReflectionUtils.newInstance(clazz, null);
-      this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-      this.bytes = new byte[0];
-    }
-
-    @Override
-    public T map(ByteBuffer bb) {
-      T next = (T) instance.deepCopy();
-      int len = bb.limit() - bb.position();
-      if (len != bytes.length) {
-        bytes = new byte[len];
-      }
-      System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
-      try {
-        deserializer.deserialize(next, bytes);
-      } catch (TException e) {
-        throw new CrunchRuntimeException(e);
-      }
-      return next;
-    }
-  }
-
-  private static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> {
-
-    private transient TSerializer serializer;
-
-    public ThriftOutputMapFn() {
-    }
-
-    @Override
-    public void initialize() {
-      this.serializer = new TSerializer(new TBinaryProtocol.Factory());
-    }
-
-    @Override
-    public ByteBuffer map(T t) {
-      try {
-        return ByteBuffer.wrap(serializer.serialize(t));
-      } catch (TException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  private static class EnumInputMapper<T extends Enum> extends MapFn<String, T> {
-    private final Class<T> type;
-
-    public EnumInputMapper(Class<T> type) {
-      this.type = type;
-    }
-
-    @Override
-    public T map(String input) {
-      return (T) Enum.valueOf(type, input);
-    }
-  };
-
-  private static class EnumOutputMapper<T extends Enum> extends MapFn<T, String> {
-
-    @Override
-    public String map(T input) {
-      return input.name();
-    }
-  };
-  
-  private static MapFn<ByteBuffer, UUID> BYTE_TO_UUID = new MapFn<ByteBuffer, UUID>() {
-    @Override
-    public UUID map(ByteBuffer input) {
-      return new UUID(input.getLong(), input.getLong());
-    }
-  };
-  
-  private static MapFn<UUID, ByteBuffer> UUID_TO_BYTE = new MapFn<UUID, ByteBuffer>() {
-    @Override
-    public ByteBuffer map(UUID input) {
-      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
-      bb.asLongBuffer().put(input.getMostSignificantBits()).put(input.getLeastSignificantBits());
-      return bb;
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/Protos.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/Protos.java b/crunch/src/main/java/org/apache/crunch/types/Protos.java
deleted file mode 100644
index 4cd5068..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/Protos.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.base.Splitter;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-
-/**
- * Utility functions for working with protocol buffers in Crunch.
- */
-public class Protos {
-
-  /**
-   * Utility function for creating a default PB Messgae from a Class object that
-   * works with both protoc 2.3.0 and 2.4.x.
-   * @param clazz The class of the protocol buffer to create
-   * @return An instance of a protocol buffer
-   */
-  public static <M extends Message> M getDefaultInstance(Class<M> clazz) {
-    if (clazz.getConstructors().length > 0) {
-      // Protobuf 2.3.0
-      return ReflectionUtils.newInstance(clazz, null);
-    } else {
-      // Protobuf 2.4.x
-      try {
-        Message.Builder mb = (Message.Builder) clazz.getDeclaredMethod("newBuilder").invoke(null);
-        return (M) mb.getDefaultInstanceForType();
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }  
-    }
-  }
-  
-  public static <M extends Message, K> MapFn<M, K> extractKey(String fieldName) {
-    return new ExtractKeyFn<M, K>(fieldName);
-  }
-
-  public static <M extends Message> DoFn<String, M> lineParser(String sep, Class<M> msgClass) {
-    return new TextToProtoFn<M>(sep, msgClass);
-  }
-
-  private static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> {
-
-    private final String fieldName;
-
-    private transient FieldDescriptor fd;
-
-    public ExtractKeyFn(String fieldName) {
-      this.fieldName = fieldName;
-    }
-
-    @Override
-    public K map(M input) {
-      if (input == null) {
-        throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn");
-      } else if (fd == null) {
-        fd = input.getDescriptorForType().findFieldByName(fieldName);
-        if (fd == null) {
-          throw new IllegalStateException("Could not find field: " + fieldName + " in message: " + input);
-        }
-      }
-      return (K) input.getField(fd);
-    }
-
-  }
-
-  private static class TextToProtoFn<M extends Message> extends DoFn<String, M> {
-
-    private final String sep;
-    private final Class<M> msgClass;
-
-    private transient M msgInstance;
-    private transient List<FieldDescriptor> fields;
-    private transient Splitter splitter;
-
-    enum ParseErrors {
-      TOTAL,
-      NUMBER_FORMAT
-    };
-
-    public TextToProtoFn(String sep, Class<M> msgClass) {
-      this.sep = sep;
-      this.msgClass = msgClass;
-    }
-
-    @Override
-    public void initialize() {
-      this.msgInstance = getDefaultInstance(msgClass);
-      this.fields = msgInstance.getDescriptorForType().getFields();
-      this.splitter = Splitter.on(sep);
-    }
-
-    @Override
-    public void process(String input, Emitter<M> emitter) {
-      if (input != null && !input.isEmpty()) {
-        Builder b = msgInstance.newBuilderForType();
-        Iterator<String> iter = splitter.split(input).iterator();
-        boolean parseError = false;
-        for (FieldDescriptor fd : fields) {
-          if (iter.hasNext()) {
-            String value = iter.next();
-            if (value != null && !value.isEmpty()) {
-              Object parsedValue = null;
-              try {
-                switch (fd.getJavaType()) {
-                case STRING:
-                  parsedValue = value;
-                  break;
-                case INT:
-                  parsedValue = Integer.valueOf(value);
-                  break;
-                case LONG:
-                  parsedValue = Long.valueOf(value);
-                  break;
-                case FLOAT:
-                  parsedValue = Float.valueOf(value);
-                  break;
-                case DOUBLE:
-                  parsedValue = Double.valueOf(value);
-                  break;
-                case BOOLEAN:
-                  parsedValue = Boolean.valueOf(value);
-                  break;
-                case ENUM:
-                  parsedValue = fd.getEnumType().findValueByName(value);
-                  break;
-                }
-                b.setField(fd, parsedValue);
-              } catch (NumberFormatException nfe) {
-                increment(ParseErrors.NUMBER_FORMAT);
-                parseError = true;
-                break;
-              }
-            }
-          }
-        }
-
-        if (parseError) {
-          increment(ParseErrors.TOTAL);
-        } else {
-          emitter.emit((M) b.build());
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
deleted file mode 100644
index a2ffae3..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.List;
-
-import org.apache.crunch.Tuple;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-/**
- * Performs deep copies (based on underlying PType deep copying) of Tuple-based objects.
- * 
- * @param <T> The type of Tuple implementation being copied
- */
-public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> {
-
-  private final TupleFactory<T> tupleFactory;
-  private final List<PType> elementTypes;
-
-  public TupleDeepCopier(Class<T> tupleClass, PType... elementTypes) {
-    tupleFactory = TupleFactory.getTupleFactory(tupleClass);
-    this.elementTypes = Lists.newArrayList(elementTypes);
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    for (PType elementType : elementTypes) {
-      elementType.initialize(conf);
-    }
-  }
-
-  @Override
-  public T deepCopy(T source) {
-    
-    if (source == null) {
-      return null;
-    }
-    
-    Object[] deepCopyValues = new Object[source.size()];
-
-    for (int valueIndex = 0; valueIndex < elementTypes.size(); valueIndex++) {
-      PType elementType = elementTypes.get(valueIndex);
-      deepCopyValues[valueIndex] = elementType.getDetachedValue(source.get(valueIndex));
-    }
-
-    return tupleFactory.makeTuple(deepCopyValues);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
deleted file mode 100644
index 73b47de..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-
-import com.google.common.collect.Maps;
-
-public abstract class TupleFactory<T extends Tuple> implements Serializable {
-
-  public void initialize() {
-  }
-
-  public abstract T makeTuple(Object... values);
-
-  
-  private static final Map<Class, TupleFactory> customTupleFactories = Maps.newHashMap();
-  
-  /**
-   * Get the {@link TupleFactory} for a given Tuple implementation.
-   * 
-   * @param tupleClass
-   *          The class for which the factory is to be retrieved
-   * @return The appropriate TupleFactory
-   */
-  public static <T extends Tuple> TupleFactory<T> getTupleFactory(Class<T> tupleClass) {
-    if (tupleClass == Pair.class) {
-      return (TupleFactory<T>) PAIR;
-    } else if (tupleClass == Tuple3.class) {
-      return (TupleFactory<T>) TUPLE3;
-    } else if (tupleClass == Tuple4.class) {
-      return (TupleFactory<T>) TUPLE4;
-    } else if (tupleClass == TupleN.class) {
-      return (TupleFactory<T>) TUPLEN;
-    } else if (customTupleFactories.containsKey(tupleClass)) {
-      return (TupleFactory<T>) customTupleFactories.get(tupleClass);
-    } else {
-      throw new IllegalArgumentException("Can't create TupleFactory for " + tupleClass);
-    }
-  }
-
-  public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() {
-    @Override
-    public Pair makeTuple(Object... values) {
-      return Pair.of(values[0], values[1]);
-    }
-  };
-
-  public static final TupleFactory<Tuple3> TUPLE3 = new TupleFactory<Tuple3>() {
-    @Override
-    public Tuple3 makeTuple(Object... values) {
-      return Tuple3.of(values[0], values[1], values[2]);
-    }
-  };
-
-  public static final TupleFactory<Tuple4> TUPLE4 = new TupleFactory<Tuple4>() {
-    @Override
-    public Tuple4 makeTuple(Object... values) {
-      return Tuple4.of(values[0], values[1], values[2], values[3]);
-    }
-  };
-
-  public static final TupleFactory<TupleN> TUPLEN = new TupleFactory<TupleN>() {
-    @Override
-    public TupleN makeTuple(Object... values) {
-      return new TupleN(values);
-    }
-  };
-
-  public static <T extends Tuple> TupleFactory<T> create(Class<T> clazz, Class... typeArgs) {
-    if (customTupleFactories.containsKey(clazz)) {
-      return (TupleFactory<T>) customTupleFactories.get(clazz);
-    }
-    TupleFactory<T> custom = new CustomTupleFactory<T>(clazz, typeArgs);
-    customTupleFactories.put(clazz, custom);
-    return custom;
-  }
-
-  private static class CustomTupleFactory<T extends Tuple> extends TupleFactory<T> {
-
-    private final Class<T> clazz;
-    private final Class[] typeArgs;
-
-    private transient Constructor<T> constructor;
-
-    public CustomTupleFactory(Class<T> clazz, Class[] typeArgs) {
-      this.clazz = clazz;
-      this.typeArgs = typeArgs;
-    }
-
-    @Override
-    public void initialize() {
-      try {
-        constructor = clazz.getConstructor(typeArgs);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-
-    @Override
-    public T makeTuple(Object... values) {
-      try {
-        return constructor.newInstance(values);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
deleted file mode 100644
index cc1636c..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-import com.google.common.collect.Lists;
-
-/**
- * Determines the capabilities of the Avro version that is currently being used.
- */
-class AvroCapabilities {
-
-  public static class Record extends org.apache.avro.specific.SpecificRecordBase implements
-      org.apache.avro.specific.SpecificRecord {
-    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser()
-        .parse("{\"type\":\"record\",\"name\":\"Record\",\"namespace\":\"org.apache.crunch.types.avro\",\"fields\":[{\"name\":\"subrecords\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}");
-    @Deprecated
-    public java.util.List<java.lang.CharSequence> subrecords;
-
-    public java.lang.Object get(int field$) {
-      switch (field$) {
-      case 0:
-        return subrecords;
-      default:
-        throw new org.apache.avro.AvroRuntimeException("Bad index");
-      }
-    }
-
-    // Used by DatumReader. Applications should not call.
-    @SuppressWarnings(value = "unchecked")
-    public void put(int field$, java.lang.Object value$) {
-      switch (field$) {
-      case 0:
-        subrecords = (java.util.List<java.lang.CharSequence>) value$;
-        break;
-      default:
-        throw new org.apache.avro.AvroRuntimeException("Bad index");
-      }
-    }
-
-    @Override
-    public Schema getSchema() {
-      return SCHEMA$;
-    }
-  }
-
-  /**
-   * Determine if the current Avro version can use the ReflectDatumReader to
-   * read SpecificData that includes an array. The inability to do this was a
-   * bug that was fixed in Avro 1.7.0.
-   * 
-   * @return true if SpecificData can be properly read using a
-   *         ReflectDatumReader
-   */
-  static boolean canDecodeSpecificSchemaWithReflectDatumReader() {
-    ReflectDatumReader<Record> datumReader = new ReflectDatumReader(Record.SCHEMA$);
-    ReflectDatumWriter<Record> datumWriter = new ReflectDatumWriter(Record.SCHEMA$);
-
-    Record record = new Record();
-    record.subrecords = Lists.<CharSequence> newArrayList("a", "b");
-
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
-
-    try {
-      datumWriter.write(record, encoder);
-      encoder.flush();
-      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(
-          byteArrayOutputStream.toByteArray(), null);
-      datumReader.read(record, decoder);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Error performing specific schema test", ioe);
-    } catch (ClassCastException cce) {
-      // This indicates that we're using a pre-1.7.0 version of Avro, as the
-      // ReflectDatumReader in those versions could not correctly handle an
-      // array in a SpecificData value
-      return false;
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
deleted file mode 100644
index 0fe9288..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Performs deep copies of Avro-serializable objects.
- * <p>
- * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be a problem when
- * running in a map-reduce context where each mapper/reducer is running in its own JVM, but it may
- * well be a problem in any other kind of multi-threaded context.
- */
-abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
-
-  private String jsonSchema;
-  private transient Configuration conf;
-  private transient Schema schema;
-  private BinaryEncoder binaryEncoder;
-  private BinaryDecoder binaryDecoder;
-
-  private transient DatumWriter<T> datumWriter;
-  private transient DatumReader<T> datumReader;
-
-  public AvroDeepCopier(Schema schema) {
-    this.jsonSchema = schema.toString();
-  }
-
-  protected Schema getSchema() {
-    if (schema == null) {
-      schema = new Schema.Parser().parse(jsonSchema);
-    }
-    return schema;
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    this.conf = conf;
-  }
-
-  protected abstract T createCopyTarget();
-
-  protected abstract DatumWriter<T> createDatumWriter(Configuration conf);
-
-  protected abstract DatumReader<T> createDatumReader(Configuration conf);
-
-  /**
-   * Deep copier for Avro specific data objects.
-   */
-  public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
-
-    private Class<T> valueClass;
-
-    public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
-      super(schema);
-      this.valueClass = valueClass;
-    }
-
-    @Override
-    protected T createCopyTarget() {
-      return createNewInstance(valueClass);
-    }
-
-    @Override
-    protected DatumWriter<T> createDatumWriter(Configuration conf) {
-      return new SpecificDatumWriter<T>(getSchema());
-    }
-
-    @Override
-    protected DatumReader<T> createDatumReader(Configuration conf) {
-      return new SpecificDatumReader<T>(getSchema());
-    }
-
-  }
-
-  /**
-   * Deep copier for Avro generic data objects.
-   */
-  public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
-
-    private transient Schema schema;
-
-    public AvroGenericDeepCopier(Schema schema) {
-      super(schema);
-    }
-
-    @Override
-    protected Record createCopyTarget() {
-      return new GenericData.Record(getSchema());
-    }
-
-    @Override
-    protected DatumReader<Record> createDatumReader(Configuration conf) {
-      return new GenericDatumReader<Record>(getSchema());
-    }
-
-    @Override
-    protected DatumWriter<Record> createDatumWriter(Configuration conf) {
-      return new GenericDatumWriter<Record>(getSchema());
-    }
-  }
-
-  /**
-   * Deep copier for Avro reflect data objects.
-   */
-  public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
-
-    private Class<T> valueClass;
-
-    public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
-      super(schema);
-      this.valueClass = valueClass;
-    }
-
-    @Override
-    protected T createCopyTarget() {
-      return createNewInstance(valueClass);
-    }
-
-    @Override
-    protected DatumReader<T> createDatumReader(Configuration conf) {
-      return Avros.getReflectDataFactory(conf).getReader(getSchema());
-    }
-
-    @Override
-    protected DatumWriter<T> createDatumWriter(Configuration conf) {
-      return Avros.getReflectDataFactory(conf).getWriter(getSchema());
-    }
-  }
-
-  /**
-   * Create a deep copy of an Avro value.
-   * 
-   * @param source The value to be copied
-   * @return The deep copy of the value
-   */
-  @Override
-  public T deepCopy(T source) {
-    
-    if (source == null) {
-      return null;
-    }
-    
-    if (datumReader == null) {
-      datumReader = createDatumReader(conf);
-    }
-    if (datumWriter == null) {
-      datumWriter = createDatumWriter(conf);
-    }
-    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
-    binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
-    T target = createCopyTarget();
-    try {
-      datumWriter.write(source, binaryEncoder);
-      binaryEncoder.flush();
-      binaryDecoder = DecoderFactory.get()
-          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
-      datumReader.read(target, binaryDecoder);
-    } catch (Exception e) {
-      throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
-    }
-
-    return target;
-  }
-
-  protected T createNewInstance(Class<T> targetClass) {
-    try {
-      return targetClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new CrunchRuntimeException(e);
-    } catch (IllegalAccessException e) {
-      throw new CrunchRuntimeException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
deleted file mode 100644
index 598868f..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.Collection;
-
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroKeyComparator;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.fn.PairMapFn;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- *
- *
- */
-class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
-
-  private static final AvroPairConverter CONVERTER = new AvroPairConverter();
-  private final MapFn inputFn;
-  private final MapFn outputFn;
-
-  public AvroGroupedTableType(AvroTableType<K, V> tableType) {
-    super(tableType);
-    AvroType keyType = (AvroType) tableType.getKeyType();
-    AvroType valueType = (AvroType) tableType.getValueType();
-    this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
-    this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
-  }
-
-  @Override
-  public Class<Pair<K, Iterable<V>>> getTypeClass() {
-    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();
-  }
-
-  @Override
-  public Converter getGroupingConverter() {
-    return CONVERTER;
-  }
-
-  @Override
-  public MapFn getInputMapFn() {
-    return inputFn;
-  }
-
-  @Override
-  public MapFn getOutputMapFn() {
-    return outputFn;
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    getTableType().initialize(conf);
-  }
-
-  @Override
-  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
-    return PTables.getGroupedDetachedValue(this, value);
-  }
-
-  @Override
-  public void configureShuffle(Job job, GroupingOptions options) {
-    AvroTableType<K, V> att = (AvroTableType<K, V>) tableType;
-    String schemaJson = att.getSchema().toString();
-    Configuration conf = job.getConfiguration();
-
-    if (att.hasReflect()) {
-      if (att.hasSpecific()) {
-        Avros.checkCombiningSpecificAndReflectionSchemas();
-      }
-      conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
-    }
-    conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson);
-    job.setSortComparatorClass(AvroKeyComparator.class);
-    job.setMapOutputKeyClass(AvroKey.class);
-    job.setMapOutputValueClass(AvroValue.class);
-    if (options != null) {
-      options.configure(job);
-    }
-
-    Avros.configureReflectDataFactory(conf);
-
-    Collection<String> serializations = job.getConfiguration().getStringCollection(
-        "io.serializations");
-    if (!serializations.contains(SafeAvroSerialization.class.getName())) {
-      serializations.add(SafeAvroSerialization.class.getName());
-      job.getConfiguration().setStrings("io.serializations", serializations.toArray(new String[0]));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
deleted file mode 100644
index b8bbebd..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/** An {@link org.apache.hadoop.mapreduce.InputFormat} for Avro data files. */
-public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
-  @Override
-  public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    context.setStatus(split.toString());
-    String jsonSchema = context.getConfiguration().get(AvroJob.INPUT_SCHEMA);
-    Schema schema = new Schema.Parser().parse(jsonSchema);
-    return new AvroRecordReader<T>(schema);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
deleted file mode 100644
index 68b717d..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.crunch.types.Converter;
-import org.apache.hadoop.io.NullWritable;
-
-class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, Iterable<K>> {
-
-  private transient AvroWrapper<K> wrapper = null;
-
-  @Override
-  public K convertInput(AvroWrapper<K> key, NullWritable value) {
-    return key.datum();
-  }
-
-  @Override
-  public AvroWrapper<K> outputKey(K value) {
-    getWrapper().datum(value);
-    return wrapper;
-  }
-
-  @Override
-  public NullWritable outputValue(K value) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public Class<AvroWrapper<K>> getKeyClass() {
-    return (Class<AvroWrapper<K>>) getWrapper().getClass();
-  }
-
-  @Override
-  public Class<NullWritable> getValueClass() {
-    return NullWritable.class;
-  }
-
-  private AvroWrapper<K> getWrapper() {
-    if (wrapper == null) {
-      wrapper = new AvroWrapper<K>();
-    }
-    return wrapper;
-  }
-
-  @Override
-  public Iterable<K> convertIterableInput(AvroWrapper<K> key, Iterable<NullWritable> value) {
-    throw new UnsupportedOperationException("Should not be possible");
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
deleted file mode 100644
index 98d3f50..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/** An {@link org.apache.hadoop.mapreduce.OutputFormat} for Avro data files. */
-public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
-
-  @Override
-  public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException,
-      InterruptedException {
-
-    Configuration conf = context.getConfiguration();
-    Schema schema = null;
-    String outputName = conf.get("crunch.namedoutput");
-    if (outputName != null && !outputName.isEmpty()) {
-      schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName));
-    } else {
-      schema = AvroJob.getOutputSchema(context.getConfiguration());
-    }
-
-    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter(schema));
-
-    JobConf jc = new JobConf(conf);
-    /* copied from org.apache.avro.mapred.AvroOutputFormat */
-    
-    if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jc)) {
-      int level = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
-          org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
-      String codecName = conf.get(AvroJob.OUTPUT_CODEC, 
-          org.apache.avro.file.DataFileConstants.DEFLATE_CODEC);
-      CodecFactory codec = codecName.equals(org.apache.avro.file.DataFileConstants.DEFLATE_CODEC)
-          ? CodecFactory.deflateCodec(level)
-          : CodecFactory.fromString(codecName);
-      WRITER.setCodec(codec);
-    }
-
-    WRITER.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY, 
-        org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL));
-
-    Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT);
-    WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path));
-    
-    return new RecordWriter<AvroWrapper<T>, NullWritable>() {
-      @Override
-      public void write(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException {
-        WRITER.append(wrapper.datum());
-      }
-
-      @Override
-      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-        WRITER.close();
-      }
-    };
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
deleted file mode 100644
index d1d2627..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.Iterator;
-
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.Converter;
-
-class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<V>, Pair<K, V>, Pair<K, Iterable<V>>> {
-
-  private transient AvroKey<K> keyWrapper = null;
-  private transient AvroValue<V> valueWrapper = null;
-
-  @Override
-  public Pair<K, V> convertInput(AvroKey<K> key, AvroValue<V> value) {
-    return Pair.of(key.datum(), value.datum());
-  }
-
-  public Pair<K, Iterable<V>> convertIterableInput(AvroKey<K> key, Iterable<AvroValue<V>> iter) {
-    Iterable<V> it = new AvroWrappedIterable<V>(iter);
-    return Pair.of(key.datum(), it);
-  }
-
-  @Override
-  public AvroKey<K> outputKey(Pair<K, V> value) {
-    getKeyWrapper().datum(value.first());
-    return keyWrapper;
-  }
-
-  @Override
-  public AvroValue<V> outputValue(Pair<K, V> value) {
-    getValueWrapper().datum(value.second());
-    return valueWrapper;
-  }
-
-  @Override
-  public Class<AvroKey<K>> getKeyClass() {
-    return (Class<AvroKey<K>>) getKeyWrapper().getClass();
-  }
-
-  @Override
-  public Class<AvroValue<V>> getValueClass() {
-    return (Class<AvroValue<V>>) getValueWrapper().getClass();
-  }
-
-  private AvroKey<K> getKeyWrapper() {
-    if (keyWrapper == null) {
-      keyWrapper = new AvroKey<K>();
-    }
-    return keyWrapper;
-  }
-
-  private AvroValue<V> getValueWrapper() {
-    if (valueWrapper == null) {
-      valueWrapper = new AvroValue<V>();
-    }
-    return valueWrapper;
-  }
-
-  private static class AvroWrappedIterable<V> implements Iterable<V> {
-
-    private final Iterable<AvroValue<V>> iters;
-
-    public AvroWrappedIterable(Iterable<AvroValue<V>> iters) {
-      this.iters = iters;
-    }
-
-    @Override
-    public Iterator<V> iterator() {
-      return new Iterator<V>() {
-        private final Iterator<AvroValue<V>> it = iters.iterator();
-
-        @Override
-        public boolean hasNext() {
-          return it.hasNext();
-        }
-
-        @Override
-        public V next() {
-          return it.next().datum();
-        }
-
-        @Override
-        public void remove() {
-          it.remove();
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
deleted file mode 100644
index 9c7578c..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/** An {@link RecordReader} for Avro data files. */
-class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
-
-  private FileReader<T> reader;
-  private long start;
-  private long end;
-  private AvroWrapper<T> key;
-  private NullWritable value;
-  private Schema schema;
-
-  public AvroRecordReader(Schema schema) {
-    this.schema = schema;
-  }
-
-  @Override
-  public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
-    FileSplit split = (FileSplit) genericSplit;
-    Configuration conf = context.getConfiguration();
-    SeekableInput in = new FsInput(split.getPath(), conf);
-    DatumReader<T> datumReader = null;
-    if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) {
-      ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-      datumReader = factory.getReader(schema);
-    } else {
-      datumReader = new SpecificDatumReader<T>(schema);
-    }
-    this.reader = DataFileReader.openReader(in, datumReader);
-    reader.sync(split.getStart()); // sync to start
-    this.start = reader.tell();
-    this.end = split.getStart() + split.getLength();
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (!reader.hasNext() || reader.pastSync(end)) {
-      key = null;
-      value = null;
-      return false;
-    }
-    if (key == null) {
-      key = new AvroWrapper<T>();
-    }
-    if (value == null) {
-      value = NullWritable.get();
-    }
-    key.datum(reader.next(key.datum()));
-    return true;
-  }
-
-  @Override
-  public AvroWrapper<T> getCurrentKey() throws IOException, InterruptedException {
-    return key;
-  }
-
-  @Override
-  public NullWritable getCurrentValue() throws IOException, InterruptedException {
-    return value;
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    if (end == start) {
-      return 0.0f;
-    } else {
-      return Math.min(1.0f, (getPos() - start) / (float) (end - start));
-    }
-  }
-
-  public long getPos() throws IOException {
-    return reader.tell();
-  }
-
-  @Override
-  public void close() throws IOException {
-    reader.close();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
deleted file mode 100644
index 86613df..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.TupleDeepCopier;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * The implementation of the PTableType interface for Avro-based serialization.
- * 
- */
-class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, V> {
-
-  private static class PairToAvroPair extends MapFn<Pair, org.apache.avro.mapred.Pair> {
-    private final MapFn keyMapFn;
-    private final MapFn valueMapFn;
-    private final String firstJson;
-    private final String secondJson;
-
-    private String pairSchemaJson;
-    private transient Schema pairSchema;
-
-    public PairToAvroPair(AvroType keyType, AvroType valueType) {
-      this.keyMapFn = keyType.getOutputMapFn();
-      this.firstJson = keyType.getSchema().toString();
-      this.valueMapFn = valueType.getOutputMapFn();
-      this.secondJson = valueType.getSchema().toString();
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      keyMapFn.configure(conf);
-      valueMapFn.configure(conf);
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      keyMapFn.setContext(context);
-      valueMapFn.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      keyMapFn.initialize();
-      valueMapFn.initialize();
-      pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
-          new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString();
-    }
-
-    @Override
-    public org.apache.avro.mapred.Pair map(Pair input) {
-      if (pairSchema == null) {
-        pairSchema = new Schema.Parser().parse(pairSchemaJson);
-      }
-      org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(pairSchema);
-      avroPair.key(keyMapFn.map(input.first()));
-      avroPair.value(valueMapFn.map(input.second()));
-      return avroPair;
-    }
-  }
-
-  private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
-
-    private final MapFn firstMapFn;
-    private final MapFn secondMapFn;
-
-    public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
-      this.firstMapFn = firstMapFn;
-      this.secondMapFn = secondMapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      firstMapFn.configure(conf);
-      secondMapFn.configure(conf);
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      firstMapFn.setContext(context);
-      secondMapFn.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      firstMapFn.initialize();
-      secondMapFn.initialize();
-    }
-
-    @Override
-    public Pair map(IndexedRecord input) {
-      return Pair.of(firstMapFn.map(input.get(0)), secondMapFn.map(input.get(1)));
-    }
-  }
-
-  private final AvroType<K> keyType;
-  private final AvroType<V> valueType;
-
-  public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
-    super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
-        valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
-        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier(
-        Pair.class, keyType, valueType), keyType, valueType);
-    this.keyType = keyType;
-    this.valueType = valueType;
-  }
-
-  @Override
-  public PType<K> getKeyType() {
-    return keyType;
-  }
-
-  @Override
-  public PType<V> getValueType() {
-    return valueType;
-  }
-
-  @Override
-  public PGroupedTableType<K, V> getGroupedTableType() {
-    return new AvroGroupedTableType<K, V>(this);
-  }
-
-  @Override
-  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
-    return PTables.getDetachedValue(this, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
deleted file mode 100644
index 4930235..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-public class AvroTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
-  class DatumRecordTextWriter extends RecordWriter<K, V> {
-    private RecordWriter lineRecordWriter;
-
-    public DatumRecordTextWriter(RecordWriter recordWriter) {
-      this.lineRecordWriter = recordWriter;
-    }
-
-    @Override
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-      lineRecordWriter.close(context);
-    }
-
-    @Override
-    public void write(K arg0, V arg1) throws IOException, InterruptedException {
-      lineRecordWriter.write(getData(arg0), getData(arg1));
-    }
-
-    private Object getData(Object o) {
-      Object data = o;
-      if (o instanceof AvroWrapper) {
-        data = ((AvroWrapper) o).datum();
-      }
-      return data;
-    }
-  }
-
-  @Override
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-    RecordWriter<K, V> recordWriter = super.getRecordWriter(context);
-    return new DatumRecordTextWriter(recordWriter);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
deleted file mode 100644
index a92b0d0..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.avro.AvroFileSourceTarget;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.DeepCopier;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-/**
- * The implementation of the PType interface for Avro-based serialization.
- * 
- */
-public class AvroType<T> implements PType<T> {
-
-  private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
-
-  private final Class<T> typeClass;
-  private final String schemaString;
-  private transient Schema schema;
-  private final MapFn baseInputMapFn;
-  private final MapFn baseOutputMapFn;
-  private final List<PType> subTypes;
-  private DeepCopier<T> deepCopier;
-  private boolean initialized = false;
-
-  public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier, PType... ptypes) {
-    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes);
-  }
-
-  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
-      DeepCopier<T> deepCopier, PType... ptypes) {
-    this.typeClass = typeClass;
-    this.schema = Preconditions.checkNotNull(schema);
-    this.schemaString = schema.toString();
-    this.baseInputMapFn = inputMapFn;
-    this.baseOutputMapFn = outputMapFn;
-    this.deepCopier = deepCopier;
-    this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
-  }
-
-  @Override
-  public Class<T> getTypeClass() {
-    return typeClass;
-  }
-
-  @Override
-  public PTypeFamily getFamily() {
-    return AvroTypeFamily.getInstance();
-  }
-
-  @Override
-  public List<PType> getSubTypes() {
-    return Lists.<PType> newArrayList(subTypes);
-  }
-
-  public Schema getSchema() {
-    if (schema == null) {
-      schema = new Schema.Parser().parse(schemaString);
-    }
-    return schema;
-  }
-
-  /**
-   * Determine if the wrapped type is a specific data avro type or wraps one.
-   * 
-   * @return true if the wrapped type is a specific data type or wraps one
-   */
-  public boolean hasSpecific() {
-    if (Avros.isPrimitive(this)) {
-      return false;
-    }
-
-    if (!this.subTypes.isEmpty()) {
-      for (PType<?> subType : this.subTypes) {
-        AvroType<?> atype = (AvroType<?>) subType;
-        if (atype.hasSpecific()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    return SpecificRecord.class.isAssignableFrom(typeClass);
-  }
-
-  /**
-   * Determine if the wrapped type is a generic data avro type.
-   * 
-   * @return true if the wrapped type is a generic type
-   */
-  public boolean isGeneric() {
-    return GenericData.Record.class.equals(typeClass);
-  }
-
-  /**
-   * Determine if the wrapped type is a reflection-based avro type or wraps one.
-   * 
-   * @return true if the wrapped type is a reflection-based type or wraps one.
-   */
-  public boolean hasReflect() {
-    if (Avros.isPrimitive(this)) {
-      return false;
-    }
-
-    if (!this.subTypes.isEmpty()) {
-      for (PType<?> subType : this.subTypes) {
-        if (((AvroType<?>) subType).hasReflect()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class
-        .isAssignableFrom(typeClass));
-  }
-
-  public MapFn<Object, T> getInputMapFn() {
-    return baseInputMapFn;
-  }
-
-  public MapFn<T, Object> getOutputMapFn() {
-    return baseOutputMapFn;
-  }
-
-  @Override
-  public Converter getConverter() {
-    return AVRO_CONVERTER;
-  }
-
-  @Override
-  public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
-    return new AvroFileSourceTarget<T>(path, this);
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    deepCopier.initialize(conf);
-    initialized = true;
-  }
-
-  @Override
-  public T getDetachedValue(T value) {
-    if (!initialized) {
-      throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType");
-    }
-    return deepCopier.deepCopy(value);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof AvroType)) {
-      return false;
-    }
-    AvroType at = (AvroType) other;
-    return (typeClass.equals(at.typeClass) && subTypes.equals(at.subTypes));
-
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    hcb.append(typeClass).append(subTypes);
-    return hcb.toHashCode();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
deleted file mode 100644
index e09e173..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.PTypeUtils;
-
-public class AvroTypeFamily implements PTypeFamily {
-
-  private static final AvroTypeFamily INSTANCE = new AvroTypeFamily();
-
-  public static AvroTypeFamily getInstance() {
-    return INSTANCE;
-  }
-
-  // There can only be one instance.
-  private AvroTypeFamily() {
-  }
-
-  @Override
-  public PType<Void> nulls() {
-    return Avros.nulls();
-  }
-
-  @Override
-  public PType<String> strings() {
-    return Avros.strings();
-  }
-
-  @Override
-  public PType<Long> longs() {
-    return Avros.longs();
-  }
-
-  @Override
-  public PType<Integer> ints() {
-    return Avros.ints();
-  }
-
-  @Override
-  public PType<Float> floats() {
-    return Avros.floats();
-  }
-
-  @Override
-  public PType<Double> doubles() {
-    return Avros.doubles();
-  }
-
-  @Override
-  public PType<Boolean> booleans() {
-    return Avros.booleans();
-  }
-
-  @Override
-  public PType<ByteBuffer> bytes() {
-    return Avros.bytes();
-  }
-
-  @Override
-  public <T> PType<T> records(Class<T> clazz) {
-    return Avros.records(clazz);
-  }
-
-  public PType<GenericData.Record> generics(Schema schema) {
-    return Avros.generics(schema);
-  }
-
-  public <T> PType<T> containers(Class<T> clazz) {
-    return Avros.containers(clazz);
-  }
-
-  @Override
-  public <T> PType<Collection<T>> collections(PType<T> ptype) {
-    return Avros.collections(ptype);
-  }
-
-  @Override
-  public <T> PType<Map<String, T>> maps(PType<T> ptype) {
-    return Avros.maps(ptype);
-  }
-
-  @Override
-  public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
-    return Avros.pairs(p1, p2);
-  }
-
-  @Override
-  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
-    return Avros.triples(p1, p2, p3);
-  }
-
-  @Override
-  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
-    return Avros.quads(p1, p2, p3, p4);
-  }
-
-  @Override
-  public PType<TupleN> tuples(PType<?>... ptypes) {
-    return Avros.tuples(ptypes);
-  }
-
-  @Override
-  public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
-    return Avros.tableOf(key, value);
-  }
-
-  @Override
-  public <T> PType<T> as(PType<T> ptype) {
-    if (ptype instanceof AvroType || ptype instanceof AvroGroupedTableType) {
-      return ptype;
-    }
-    if (ptype instanceof PGroupedTableType) {
-      PTableType ptt = ((PGroupedTableType) ptype).getTableType();
-      return new AvroGroupedTableType((AvroTableType) as(ptt));
-    }
-    Class<T> typeClass = ptype.getTypeClass();
-    PType<T> prim = Avros.getPrimitiveType(typeClass);
-    if (prim != null) {
-      return prim;
-    }
-    return PTypeUtils.convert(ptype, this);
-  }
-
-  @Override
-  public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
-    return Avros.tuples(clazz, ptypes);
-  }
-
-  @Override
-  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
-    return Avros.derived(clazz, inputFn, outputFn, base);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
deleted file mode 100644
index 9460fa5..0000000
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
-
-/**
- * An {@link org.apache.hadoop.mapred.InputFormat} for text files. Each line is
- * a {@link Utf8} key; values are null.
- */
-public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, NullWritable> {
-
-  static class Utf8LineRecordReader extends RecordReader<AvroWrapper<Utf8>, NullWritable> {
-
-    private LineRecordReader lineRecordReader;
-
-    private AvroWrapper<Utf8> currentKey = new AvroWrapper<Utf8>();
-
-    public Utf8LineRecordReader() throws IOException {
-      this.lineRecordReader = new LineRecordReader();
-    }
-
-    public void close() throws IOException {
-      lineRecordReader.close();
-    }
-
-    public float getProgress() throws IOException {
-      return lineRecordReader.getProgress();
-    }
-
-    @Override
-    public AvroWrapper<Utf8> getCurrentKey() throws IOException, InterruptedException {
-      Text txt = lineRecordReader.getCurrentValue();
-      currentKey.datum(new Utf8(txt.toString()));
-      return currentKey;
-    }
-
-    @Override
-    public NullWritable getCurrentValue() throws IOException, InterruptedException {
-      return NullWritable.get();
-    }
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-      lineRecordReader.initialize(split, context);
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      return lineRecordReader.nextKeyValue();
-    }
-  }
-
-  private CompressionCodecFactory compressionCodecs = null;
-
-  public void configure(Configuration conf) {
-    compressionCodecs = new CompressionCodecFactory(conf);
-  }
-
-  protected boolean isSplitable(FileSystem fs, Path file) {
-    return compressionCodecs.getCodec(file) == null;
-  }
-
-  @Override
-  public RecordReader<AvroWrapper<Utf8>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    return new Utf8LineRecordReader();
-  }
-}