You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/06/29 23:30:30 UTC

git commit: Improve compatibility with Avro ReflectDatumReader

Updated Branches:
  refs/heads/master 76c501568 -> fe53b71fe


Improve compatibility with Avro ReflectDatumReader

Allow Avro-based readers to correctly select between the
ReflectDatumReader, GenericDatumReader, and SpecificDatumReader,
allowing POJOs to be used fully throughout pipelines.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/fe53b71f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/fe53b71f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/fe53b71f

Branch: refs/heads/master
Commit: fe53b71fe3ecf049eaa5fc3cfc99c8fa0cdd5169
Parents: 76c5015
Author: Gabriel Reid <ga...@gmail.com>
Authored: Fri Jun 29 23:26:20 2012 +0200
Committer: Gabriel Reid <ga...@gmail.com>
Committed: Fri Jun 29 23:26:20 2012 +0200

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileReaderFactory.java      |   83 +-
 .../cloudera/crunch/types/avro/AvroTableType.java  |  246 ++--
 .../com/cloudera/crunch/types/avro/AvroType.java   |   34 +-
 .../java/com/cloudera/crunch/types/avro/Avros.java | 1082 ++++++++-------
 .../crunch/io/avro/AvroFileReaderFactoryTest.java  |   67 +-
 .../crunch/io/avro/AvroFileSourceTargetTest.java   |   60 +-
 .../cloudera/crunch/io/avro/AvroReflectTest.java   |   98 ++
 .../cloudera/crunch/types/avro/AvroTypeTest.java   |   37 +-
 8 files changed, 976 insertions(+), 731 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java b/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
index 82eb379..a3d673c 100644
--- a/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
+++ b/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
@@ -21,6 +21,7 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,47 +37,51 @@ import com.google.common.collect.UnmodifiableIterator;
 
 public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
 
-  private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
+	private static final Log LOG = LogFactory
+			.getLog(AvroFileReaderFactory.class);
 
-  private final DatumReader<T> recordReader;
-  private final MapFn<T, T> mapFn;
-  private final Configuration conf;
-  
-  public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
-    this.recordReader = createDatumReader(atype);
-    this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
-    this.conf = conf;
-  }
+	private final DatumReader<T> recordReader;
+	private final MapFn<T, T> mapFn;
+	private final Configuration conf;
 
-  private DatumReader<T> createDatumReader(AvroType<T> avroType) {
-    if (avroType.isSpecific()) {
-      return new SpecificDatumReader<T>(avroType.getSchema());
-    } else {
-      return new GenericDatumReader<T>(avroType.getSchema());
-    }
-  }
+	public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
+		this.recordReader = createDatumReader(atype);
+		this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+		this.conf = conf;
+	}
 
-  @Override
-  public Iterator<T> read(FileSystem fs, final Path path) {
-    this.mapFn.setConfigurationForTest(conf);
-    this.mapFn.initialize();
-    try {
-      FsInput fsi = new FsInput(path, fs.getConf());
-      final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
-      return new UnmodifiableIterator<T>() {
-        @Override
-        public boolean hasNext() {
-          return reader.hasNext();
-        }
+	private DatumReader<T> createDatumReader(AvroType<T> avroType) {
+		if (avroType.isSpecific()) {
+			return new SpecificDatumReader<T>(avroType.getSchema());
+		} else if (avroType.isGeneric()) {
+			return new GenericDatumReader<T>(avroType.getSchema());
+		} else {
+			return new ReflectDatumReader<T>(avroType.getSchema());
+		}
+	}
 
-        @Override
-        public T next() {
-          return mapFn.map(reader.next());
-        }
-      };
-    } catch (IOException e) {
-      LOG.info("Could not read avro file at path: " + path, e);
-      return Iterators.emptyIterator();
-    }
-  }
+	@Override
+	public Iterator<T> read(FileSystem fs, final Path path) {
+		this.mapFn.setConfigurationForTest(conf);
+		this.mapFn.initialize();
+		try {
+			FsInput fsi = new FsInput(path, fs.getConf());
+			final DataFileReader<T> reader = new DataFileReader<T>(fsi,
+					recordReader);
+			return new UnmodifiableIterator<T>() {
+				@Override
+				public boolean hasNext() {
+					return reader.hasNext();
+				}
+
+				@Override
+				public T next() {
+					return mapFn.map(reader.next());
+				}
+			};
+		} catch (IOException e) {
+			LOG.info("Could not read avro file at path: " + path, e);
+			return Iterators.emptyIterator();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
index 9a93ce3..8d71b7f 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
@@ -26,123 +26,133 @@ import com.cloudera.crunch.types.PType;
 
 /**
  * The implementation of the PTableType interface for Avro-based serialization.
- *
+ * 
  */
-public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, V> {
-  
-  private static class PairToAvroPair extends MapFn<Pair, org.apache.avro.mapred.Pair> {
-    private final MapFn keyMapFn;
-    private final MapFn valueMapFn;
-    private final String firstJson;
-    private final String secondJson;
-    
-    private String pairSchemaJson;
-    private transient Schema pairSchema;
-    
-    public PairToAvroPair(AvroType keyType, AvroType valueType) {
-      this.keyMapFn = keyType.getOutputMapFn();
-      this.firstJson = keyType.getSchema().toString();
-      this.valueMapFn = valueType.getOutputMapFn();
-      this.secondJson = valueType.getSchema().toString();
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      keyMapFn.configure(conf);
-      valueMapFn.configure(conf);
-    }
-
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      keyMapFn.setConfigurationForTest(conf);
-      valueMapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
-    public void initialize() {
-      keyMapFn.setContext(getContext());
-      valueMapFn.setContext(getContext());
-      pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
-          new Schema.Parser().parse(firstJson),
-          new Schema.Parser().parse(secondJson)).toString();
-    }
-    
-    @Override
-    public org.apache.avro.mapred.Pair map(Pair input) {
-      if(pairSchema == null) {
-        pairSchema = new Schema.Parser().parse(pairSchemaJson);
-      }
-      org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(pairSchema);
-      avroPair.key(keyMapFn.map(input.first()));
-      avroPair.value(valueMapFn.map(input.second()));
-      return avroPair;
-    }
-  }
-  
-  private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
-    
-    private final MapFn firstMapFn;
-    private final MapFn secondMapFn;
-    
-    public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
-      this.firstMapFn = firstMapFn;
-      this.secondMapFn = secondMapFn;
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      firstMapFn.configure(conf);
-      secondMapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      firstMapFn.setConfigurationForTest(conf);
-      secondMapFn.setConfigurationForTest(conf);
-    }
-    
-    @Override
-    public void initialize() {
-      firstMapFn.setContext(getContext());
-      secondMapFn.setContext(getContext());
-    }
-    
-    @Override
-    public Pair map(IndexedRecord input) {
-      return Pair.of(firstMapFn.map(input.get(0)), secondMapFn.map(input.get(1)));
-    }
-  }
-  
-  private final AvroType<K> keyType;
-  private final AvroType<V> valueType;
-  
-  public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
-    super(pairClass,
-        org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()),
-        new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()),
-        new PairToAvroPair(keyType, valueType), keyType, valueType);
-    this.keyType = keyType;
-    this.valueType = valueType;
-  }
-  
-  @Override
-  public boolean isSpecific() {
-	return keyType.isSpecific() || valueType.isSpecific();
-  }
-  
-  @Override
-  public PType<K> getKeyType() {
-    return keyType;
-  }
-
-  @Override
-  public PType<V> getValueType() {
-    return valueType;
-  }
-
-  @Override
-  public PGroupedTableType<K, V> getGroupedTableType() {
-    return new AvroGroupedTableType<K, V>(this);
-  }
+public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements
+		PTableType<K, V> {
+
+	private static class PairToAvroPair extends
+			MapFn<Pair, org.apache.avro.mapred.Pair> {
+		private final MapFn keyMapFn;
+		private final MapFn valueMapFn;
+		private final String firstJson;
+		private final String secondJson;
+
+		private String pairSchemaJson;
+		private transient Schema pairSchema;
+
+		public PairToAvroPair(AvroType keyType, AvroType valueType) {
+			this.keyMapFn = keyType.getOutputMapFn();
+			this.firstJson = keyType.getSchema().toString();
+			this.valueMapFn = valueType.getOutputMapFn();
+			this.secondJson = valueType.getSchema().toString();
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			keyMapFn.configure(conf);
+			valueMapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			keyMapFn.setConfigurationForTest(conf);
+			valueMapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			keyMapFn.setContext(getContext());
+			valueMapFn.setContext(getContext());
+			pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
+					new Schema.Parser().parse(firstJson),
+					new Schema.Parser().parse(secondJson)).toString();
+		}
+
+		@Override
+		public org.apache.avro.mapred.Pair map(Pair input) {
+			if (pairSchema == null) {
+				pairSchema = new Schema.Parser().parse(pairSchemaJson);
+			}
+			org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(
+					pairSchema);
+			avroPair.key(keyMapFn.map(input.first()));
+			avroPair.value(valueMapFn.map(input.second()));
+			return avroPair;
+		}
+	}
+
+	private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
+
+		private final MapFn firstMapFn;
+		private final MapFn secondMapFn;
+
+		public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
+			this.firstMapFn = firstMapFn;
+			this.secondMapFn = secondMapFn;
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			firstMapFn.configure(conf);
+			secondMapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			firstMapFn.setConfigurationForTest(conf);
+			secondMapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			firstMapFn.setContext(getContext());
+			secondMapFn.setContext(getContext());
+		}
+
+		@Override
+		public Pair map(IndexedRecord input) {
+			return Pair.of(firstMapFn.map(input.get(0)),
+					secondMapFn.map(input.get(1)));
+		}
+	}
+
+	private final AvroType<K> keyType;
+	private final AvroType<V> valueType;
+
+	public AvroTableType(AvroType<K> keyType, AvroType<V> valueType,
+			Class<Pair<K, V>> pairClass) {
+		super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(
+				keyType.getSchema(), valueType.getSchema()),
+				new IndexedRecordToPair(keyType.getInputMapFn(),
+						valueType.getInputMapFn()), new PairToAvroPair(keyType,
+						valueType), keyType, valueType);
+		this.keyType = keyType;
+		this.valueType = valueType;
+	}
+
+	@Override
+	public boolean isSpecific() {
+		return keyType.isSpecific() || valueType.isSpecific();
+	}
+
+	@Override
+	public boolean isGeneric() {
+		return keyType.isGeneric() || valueType.isGeneric();
+	}
+
+	@Override
+	public PType<K> getKeyType() {
+		return keyType;
+	}
+
+	@Override
+	public PType<V> getValueType() {
+		return valueType;
+	}
+
+	@Override
+	public PGroupedTableType<K, V> getGroupedTableType() {
+		return new AvroGroupedTableType<K, V>(this);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
index e9696d8..3db00c0 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
@@ -17,6 +17,7 @@ package com.cloudera.crunch.types.avro;
 import java.util.List;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.Path;
@@ -46,8 +47,8 @@ public class AvroType<T> implements PType<T> {
 	private final List<PType> subTypes;
 
 	public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
-		this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(),
-				ptypes);
+		this(typeClass, schema, IdentityFn.getInstance(), IdentityFn
+				.getInstance(), ptypes);
 	}
 
 	public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn,
@@ -79,20 +80,29 @@ public class AvroType<T> implements PType<T> {
 	}
 
 	/**
-	 * Determine if the wrapped type is a specific or generic avro type.
+	 * Determine if the wrapped type is a specific data avro type.
 	 * 
 	 * @return true if the wrapped type is a specific data type
 	 */
 	public boolean isSpecific() {
-	  if (SpecificRecord.class.isAssignableFrom(typeClass)) {
-	    return true;
-	  }
-	  for (PType ptype : subTypes) {
-	    if (SpecificRecord.class.isAssignableFrom(ptype.getTypeClass())) {
-	      return true;
-	    }
-	  }
-	  return false;
+		if (SpecificRecord.class.isAssignableFrom(typeClass)) {
+			return true;
+		}
+		for (PType ptype : subTypes) {
+			if (SpecificRecord.class.isAssignableFrom(ptype.getTypeClass())) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * Determine if the wrapped type is a generic data avro type.
+	 * 
+	 * @return true if the wrapped type is a generic type
+	 */
+	public boolean isGeneric() {
+		return GenericData.Record.class.equals(typeClass);
 	}
 
 	public MapFn<Object, T> getInputMapFn() {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/main/java/com/cloudera/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/Avros.java b/src/main/java/com/cloudera/crunch/types/avro/Avros.java
index d084e3b..d2edae2 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/Avros.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/Avros.java
@@ -55,530 +55,566 @@ import com.google.common.collect.Maps;
 /**
  * Defines static methods that are analogous to the methods defined in
  * {@link AvroTypeFamily} for convenient static importing.
- *
+ * 
  */
 public class Avros {
 
-  /**
-   * The instance we use for generating reflected schemas. May be modified by clients (e.g., Scrunch.)
-   */
-  public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
-
-  /**
-   * The name of the configuration parameter that tracks which reflection factory to use.
-   */
-  public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
-  
-  public static void configureReflectDataFactory(Configuration conf) {
-    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(),
-        ReflectDataFactory.class);
-  }
-  
-  public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
-    return (ReflectDataFactory) ReflectionUtils.newInstance(
-        conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);  
-  }
-  
-  public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
-    @Override
-    public String map(CharSequence input) {
-      return input.toString();
-    }
-  };
-  
-  public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
-    @Override
-    public Utf8 map(String input) {
-      return new Utf8(input);
-    }
-  };
-
-  public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
-    @Override
-    public ByteBuffer map(Object input) {
-      if (input instanceof ByteBuffer) {
-        return (ByteBuffer) input;
-      }
-      return ByteBuffer.wrap((byte[]) input);
-    }
-  };
-
-  private static final AvroType<String> strings = new AvroType<String>(
-      String.class, Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8);
-  private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
-  private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
-  private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
-  private static final AvroType<Float> floats = create(Float.class, Schema.Type.FLOAT);
-  private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
-  private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
-  private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(
-	  ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance());
-  
-  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>>builder()
-      .put(String.class, strings)
-      .put(Long.class, longs)
-      .put(Integer.class, ints)
-      .put(Float.class, floats)
-      .put(Double.class, doubles)
-      .put(Boolean.class, booleans)
-      .put(ByteBuffer.class, bytes)
-      .build();
-  
-  private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
-  
-  public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
-    EXTENSIONS.put(clazz, ptype);
-  }
-  
-  public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
-    return (PType<T>) PRIMITIVES.get(clazz);
-  }
-  
-  private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
-    return new AvroType<T>(clazz, Schema.create(schemaType));
-  }
-
-  public static final AvroType<Void> nulls() {
-    return nulls;
-  }
-
-  public static final AvroType<String> strings() {
-    return strings;
-  }
-
-  public static final AvroType<Long> longs() {
-    return longs;
-  }
-
-  public static final AvroType<Integer> ints() {
-    return ints;
-  }
-
-  public static final AvroType<Float> floats() {
-    return floats;
-  }
-
-  public static final AvroType<Double> doubles() {
-    return doubles;
-  }
-
-  public static final AvroType<Boolean> booleans() {
-    return booleans;
-  }
-  
-  public static final AvroType<ByteBuffer> bytes() {
-    return bytes;
-  }
-  
-  public static final <T> AvroType<T> records(Class<T> clazz) {
-    if (EXTENSIONS.containsKey(clazz)) {
-      return (AvroType<T>) EXTENSIONS.get(clazz);
-    }
-    return containers(clazz);
-  }
-
-  public static final AvroType<GenericData.Record> generics(Schema schema) {
-    return new AvroType<GenericData.Record>(GenericData.Record.class, schema);
-  }
-  
-  public static final <T> AvroType<T> containers(Class<T> clazz) {
-    return reflects(clazz);
-  }
-  
-  public static final <T> AvroType<T> reflects(Class<T> clazz) {
-    return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz));
-  }
-  
-  private static class BytesToWritableMapFn<T extends Writable> extends MapFn<ByteBuffer, T> {
-    private static final Log LOG = LogFactory.getLog(BytesToWritableMapFn.class);
-    
-    private final Class<T> writableClazz;
-    
-    public BytesToWritableMapFn(Class<T> writableClazz) {
-      this.writableClazz = writableClazz;
-    }
-    
-    @Override
-    public T map(ByteBuffer input) {
-      T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
-      try {
-        instance.readFields(new DataInputStream(new ByteArrayInputStream(
-            input.array(), input.arrayOffset(), input.limit())));
-      } catch (IOException e) {
-        LOG.error("Exception thrown reading instance of: " + writableClazz, e);
-      }
-      return instance;
-    } 
-  }
-  
-  private static class WritableToBytesMapFn<T extends Writable> extends MapFn<T, ByteBuffer> {
-    private static final Log LOG = LogFactory.getLog(WritableToBytesMapFn.class);
-    
-    @Override
-    public ByteBuffer map(T input) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      DataOutputStream das = new DataOutputStream(baos);
-      try {
-        input.write(das);
-      } catch (IOException e) {
-        LOG.error("Exception thrown converting Writable to bytes", e);
-      }
-      return ByteBuffer.wrap(baos.toByteArray());
-    }
-  }
-  
-  public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
-    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
-        new WritableToBytesMapFn<T>());
-  }
-  
-  private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
-
-    private final MapFn<Object,T> mapFn;
-    
-    public GenericDataArrayToCollection(MapFn<Object,T> mapFn) {
-      this.mapFn = mapFn;
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-    
-    @Override
-    public void initialize() {
-      this.mapFn.setContext(getContext());
-    }
-    
-    @Override
-    public Collection<T> map(Object input) {
-      Collection<T> ret = Lists.newArrayList();
-      if (input instanceof Collection) {
-        for (Object in : (Collection<Object>) input) {
-          ret.add(mapFn.map(in));
-        }
-      } else {
-        // Assume it is an array
-        Object[] arr = (Object[]) input;
-        for (Object in : arr) {
-          ret.add(mapFn.map(in));
-        }
-      }
-      return ret;
-    }
-  }
-  
-  private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
-    
-    private final MapFn mapFn;
-    private final String jsonSchema;
-    private transient Schema schema;
-    
-    public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
-      this.mapFn = mapFn;
-      this.jsonSchema = schema.toString();
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
-    public void initialize() {
-      this.mapFn.setContext(getContext());
-    }
-    
-    @Override
-    public GenericData.Array<?> map(Collection<?> input) {
-      if(schema == null) {
-        schema = new Schema.Parser().parse(jsonSchema);
-      }
-      GenericData.Array array = new GenericData.Array(input.size(), schema);
-      for (Object in : input) {
-        array.add(mapFn.map(in));
-      }
-      return array;
-    }
-  }
-  
-  public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
-    AvroType<T> avroType = (AvroType<T>) ptype;
-    Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
-    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
-    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
-    return new AvroType(Collection.class, collectionSchema, input, output, ptype);
-  }
-
-  private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
-    private final MapFn<Object, T> mapFn;
-
-    public AvroMapToMap(MapFn<Object, T> mapFn) {
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
-    public void initialize() {
-      this.mapFn.setContext(getContext());
-    }
-
-    @Override
-    public Map<String, T> map(Map<CharSequence, Object> input) {
-      Map<String, T> out = Maps.newHashMap();
-      for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
-        out.put(e.getKey().toString(), mapFn.map(e.getValue()));
-      }
-      return out;
-    }
-  }
-
-  private static class MapToAvroMap<T> extends MapFn<Map<String, T>, Map<Utf8, Object>> {
-    private final MapFn<T, Object> mapFn;
-
-    public MapToAvroMap(MapFn<T, Object> mapFn) {
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
-    public void initialize() {
-      this.mapFn.setContext(getContext());
-    }
-
-    @Override
-    public Map<Utf8, Object> map(Map<String, T> input) {
-      Map<Utf8, Object> out = Maps.newHashMap();
-      for (Map.Entry<String, T> e : input.entrySet()) {
-        out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
-      }
-      return out;
-    }
-  }
-
-  public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
-    AvroType<T> avroType = (AvroType<T>) ptype;
-    Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
-    AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
-    MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
-    return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
-  }
-  
-  private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
-    private final TupleFactory<?> tupleFactory;
-    private final List<MapFn> fns;
-    
-    private transient Object[] values;
-    
-    public GenericRecordToTuple(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
-      this.tupleFactory = tupleFactory;
-      this.fns = Lists.newArrayList();
-      for (PType<?> ptype : ptypes) {
-        AvroType atype = (AvroType) ptype;
-        fns.add(atype.getInputMapFn());
-      }
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.setConfigurationForTest(conf);
-      }
-    }
-
-    @Override
-    public void initialize() {
-      for (MapFn fn : fns) {
-        fn.setContext(getContext());
-      }
-      this.values = new Object[fns.size()];
-      tupleFactory.initialize();
-    }
-
-    @Override
-    public Tuple map(GenericRecord input) {
-      for (int i = 0; i < values.length; i++) {
-        Object v = input.get(i);
-        if (v == null) {
-          values[i] = null;
-        } else {
-          values[i] = fns.get(i).map(v);
-        }
-      }
-      return tupleFactory.makeTuple(values);
-    }
-  }
-  
-  private static class TupleToGenericRecord extends MapFn<Tuple, GenericRecord> {
-    private final List<MapFn> fns;
-    private final String jsonSchema;
-    
-    private transient GenericRecord record;
-    
-    public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
-      this.fns = Lists.newArrayList();
-      this.jsonSchema = schema.toString();
-      for (PType ptype : ptypes) {
-        AvroType atype = (AvroType) ptype;
-        fns.add(atype.getOutputMapFn());
-      }
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.setConfigurationForTest(conf);
-      }
-    }
-
-    @Override
-    public void initialize() {
-      this.record = new GenericData.Record(new Schema.Parser().parse(jsonSchema));
-      for (MapFn fn : fns) {
-        fn.setContext(getContext());
-      }
-    }
-
-    @Override
-    public GenericRecord map(Tuple input) {
-      for (int i = 0; i < input.size(); i++) {
-        Object v = input.get(i);
-        if (v == null) {
-          record.put(i, null);
-        } else {
-          record.put(i, fns.get(i).map(v));
-        }
-      }
-      return record;
-    }
-  }
-  
-  public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
-    Schema schema = createTupleSchema(p1, p2);
-    GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
-    TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
-    return new AvroType(Pair.class, schema, input, output, p1, p2);
-  }
-
-  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3) {
-    Schema schema = createTupleSchema(p1, p2, p3);
-    return new AvroType(Tuple3.class, schema,
-        new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
-        new TupleToGenericRecord(schema, p1, p2, p3),
-        p1, p2, p3);
-  }
-
-  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
-    Schema schema = createTupleSchema(p1, p2, p3, p4);
-    return new AvroType(Tuple4.class, schema,
-        new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
-        new TupleToGenericRecord(schema, p1, p2, p3, p4),
-        p1, p2, p3, p4);
-  }
-
-  public static final AvroType<TupleN> tuples(PType... ptypes) {
-    Schema schema = createTupleSchema(ptypes);
-    return new AvroType(TupleN.class, schema,
-        new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
-        new TupleToGenericRecord(schema, ptypes), ptypes);
-  }
-  
-  public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
-    Schema schema = createTupleSchema(ptypes);
-    Class[] typeArgs = new Class[ptypes.length];
-    for (int i = 0; i < typeArgs.length; i++) {
-      typeArgs[i] = ptypes[i].getTypeClass();
-    }
-    TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-    return new AvroType<T>(clazz, schema,
-        new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema, ptypes),
-        ptypes);  
-  }
-  
-  private static Schema createTupleSchema(PType<?>... ptypes) {
-    // Guarantee each tuple schema has a globally unique name
-    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
-    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
-    List<Schema.Field> fields = Lists.newArrayList();
-    for (int i = 0; i < ptypes.length; i++) {
-      AvroType atype = (AvroType) ptypes[i];
-      Schema fieldSchema = allowNulls(atype.getSchema());
-      fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
-    }
-    schema.setFields(fields);
-    return schema;
-  }
-  
-  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
-      MapFn<T, S> outputFn, PType<S> base) {
-    AvroType<S> abase = (AvroType<S>) base;
-    return new AvroType<T>(clazz, abase.getSchema(),
-        new CompositeMapFn(abase.getInputMapFn(), inputFn),
-        new CompositeMapFn(outputFn, abase.getOutputMapFn()),
-        base.getSubTypes().toArray(new PType[0]));
-  }
-  
-  public static <T> PType<T> jsons(Class<T> clazz) {
-    return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());  
-  }
-  
-  public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key, PType<V> value) {
-    AvroType<K> avroKey = (AvroType<K>) key;
-    AvroType<V> avroValue = (AvroType<V>) value;    
-    return new AvroTableType(avroKey, avroValue, Pair.class);
-  }
-
-  private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
-  private static Schema allowNulls(Schema base) {
-    if (NULL_SCHEMA.equals(base)) {
-      return base;
-    }
-    return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
-  }
-  
-  private Avros() {}
+	/**
+	 * The instance we use for generating reflected schemas. May be modified by
+	 * clients (e.g., Scrunch.)
+	 */
+	public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
+
+	/**
+	 * The name of the configuration parameter that tracks which reflection
+	 * factory to use.
+	 */
+	public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
+
+	public static void configureReflectDataFactory(Configuration conf) {
+		conf.setClass(REFLECT_DATA_FACTORY_CLASS,
+				REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
+	}
+
+	public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
+		return (ReflectDataFactory) ReflectionUtils.newInstance(conf.getClass(
+				REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
+	}
+
+	public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
+		@Override
+		public String map(CharSequence input) {
+			return input.toString();
+		}
+	};
+
+	public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
+		@Override
+		public Utf8 map(String input) {
+			return new Utf8(input);
+		}
+	};
+
+	public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
+		@Override
+		public ByteBuffer map(Object input) {
+			if (input instanceof ByteBuffer) {
+				return (ByteBuffer) input;
+			}
+			return ByteBuffer.wrap((byte[]) input);
+		}
+	};
+
+	private static final AvroType<String> strings = new AvroType<String>(
+			String.class, Schema.create(Schema.Type.STRING), UTF8_TO_STRING,
+			STRING_TO_UTF8);
+	private static final AvroType<Void> nulls = create(Void.class,
+			Schema.Type.NULL);
+	private static final AvroType<Long> longs = create(Long.class,
+			Schema.Type.LONG);
+	private static final AvroType<Integer> ints = create(Integer.class,
+			Schema.Type.INT);
+	private static final AvroType<Float> floats = create(Float.class,
+			Schema.Type.FLOAT);
+	private static final AvroType<Double> doubles = create(Double.class,
+			Schema.Type.DOUBLE);
+	private static final AvroType<Boolean> booleans = create(Boolean.class,
+			Schema.Type.BOOLEAN);
+	private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(
+			ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN,
+			IdentityFn.getInstance());
+
+	private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
+			.<Class<?>, PType<?>> builder().put(String.class, strings)
+			.put(Long.class, longs).put(Integer.class, ints)
+			.put(Float.class, floats).put(Double.class, doubles)
+			.put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+
+	private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps
+			.newHashMap();
+
+	public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
+		EXTENSIONS.put(clazz, ptype);
+	}
+
+	public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
+		return (PType<T>) PRIMITIVES.get(clazz);
+	}
+
+	private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
+		return new AvroType<T>(clazz, Schema.create(schemaType));
+	}
+
+	public static final AvroType<Void> nulls() {
+		return nulls;
+	}
+
+	public static final AvroType<String> strings() {
+		return strings;
+	}
+
+	public static final AvroType<Long> longs() {
+		return longs;
+	}
+
+	public static final AvroType<Integer> ints() {
+		return ints;
+	}
+
+	public static final AvroType<Float> floats() {
+		return floats;
+	}
+
+	public static final AvroType<Double> doubles() {
+		return doubles;
+	}
+
+	public static final AvroType<Boolean> booleans() {
+		return booleans;
+	}
+
+	public static final AvroType<ByteBuffer> bytes() {
+		return bytes;
+	}
+
+	public static final <T> AvroType<T> records(Class<T> clazz) {
+		if (EXTENSIONS.containsKey(clazz)) {
+			return (AvroType<T>) EXTENSIONS.get(clazz);
+		}
+		return containers(clazz);
+	}
+
+	public static final AvroType<GenericData.Record> generics(Schema schema) {
+		return new AvroType<GenericData.Record>(GenericData.Record.class,
+				schema);
+	}
+
+	public static final <T> AvroType<T> containers(Class<T> clazz) {
+		return reflects(clazz);
+	}
+
+	public static final <T> AvroType<T> reflects(Class<T> clazz) {
+		return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData()
+				.getSchema(clazz));
+	}
+
+	private static class BytesToWritableMapFn<T extends Writable> extends
+			MapFn<ByteBuffer, T> {
+		private static final Log LOG = LogFactory
+				.getLog(BytesToWritableMapFn.class);
+
+		private final Class<T> writableClazz;
+
+		public BytesToWritableMapFn(Class<T> writableClazz) {
+			this.writableClazz = writableClazz;
+		}
+
+		@Override
+		public T map(ByteBuffer input) {
+			T instance = ReflectionUtils.newInstance(writableClazz,
+					getConfiguration());
+			try {
+				instance.readFields(new DataInputStream(
+						new ByteArrayInputStream(input.array(), input
+								.arrayOffset(), input.limit())));
+			} catch (IOException e) {
+				LOG.error("Exception thrown reading instance of: "
+						+ writableClazz, e);
+			}
+			return instance;
+		}
+	}
+
+	private static class WritableToBytesMapFn<T extends Writable> extends
+			MapFn<T, ByteBuffer> {
+		private static final Log LOG = LogFactory
+				.getLog(WritableToBytesMapFn.class);
+
+		@Override
+		public ByteBuffer map(T input) {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			DataOutputStream das = new DataOutputStream(baos);
+			try {
+				input.write(das);
+			} catch (IOException e) {
+				LOG.error("Exception thrown converting Writable to bytes", e);
+			}
+			return ByteBuffer.wrap(baos.toByteArray());
+		}
+	}
+
+	public static final <T extends Writable> AvroType<T> writables(
+			Class<T> clazz) {
+		return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES),
+				new BytesToWritableMapFn<T>(clazz),
+				new WritableToBytesMapFn<T>());
+	}
+
+	private static class GenericDataArrayToCollection<T> extends
+			MapFn<Object, Collection<T>> {
+
+		private final MapFn<Object, T> mapFn;
+
+		public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
+			this.mapFn = mapFn;
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			mapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			mapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			this.mapFn.setContext(getContext());
+		}
+
+		@Override
+		public Collection<T> map(Object input) {
+			Collection<T> ret = Lists.newArrayList();
+			if (input instanceof Collection) {
+				for (Object in : (Collection<Object>) input) {
+					ret.add(mapFn.map(in));
+				}
+			} else {
+				// Assume it is an array
+				Object[] arr = (Object[]) input;
+				for (Object in : arr) {
+					ret.add(mapFn.map(in));
+				}
+			}
+			return ret;
+		}
+	}
+
+	private static class CollectionToGenericDataArray extends
+			MapFn<Collection<?>, GenericData.Array<?>> {
+
+		private final MapFn mapFn;
+		private final String jsonSchema;
+		private transient Schema schema;
+
+		public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
+			this.mapFn = mapFn;
+			this.jsonSchema = schema.toString();
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			mapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			mapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			this.mapFn.setContext(getContext());
+		}
+
+		@Override
+		public GenericData.Array<?> map(Collection<?> input) {
+			if (schema == null) {
+				schema = new Schema.Parser().parse(jsonSchema);
+			}
+			GenericData.Array array = new GenericData.Array(input.size(),
+					schema);
+			for (Object in : input) {
+				array.add(mapFn.map(in));
+			}
+			return array;
+		}
+	}
+
+	public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
+		AvroType<T> avroType = (AvroType<T>) ptype;
+		Schema collectionSchema = Schema.createArray(allowNulls(avroType
+				.getSchema()));
+		GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
+				avroType.getInputMapFn());
+		CollectionToGenericDataArray output = new CollectionToGenericDataArray(
+				collectionSchema, avroType.getOutputMapFn());
+		return new AvroType(Collection.class, collectionSchema, input, output,
+				ptype);
+	}
+
+	private static class AvroMapToMap<T> extends
+			MapFn<Map<CharSequence, Object>, Map<String, T>> {
+		private final MapFn<Object, T> mapFn;
+
+		public AvroMapToMap(MapFn<Object, T> mapFn) {
+			this.mapFn = mapFn;
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			mapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			mapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			this.mapFn.setContext(getContext());
+		}
+
+		@Override
+		public Map<String, T> map(Map<CharSequence, Object> input) {
+			Map<String, T> out = Maps.newHashMap();
+			for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
+				out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+			}
+			return out;
+		}
+	}
+
+	private static class MapToAvroMap<T> extends
+			MapFn<Map<String, T>, Map<Utf8, Object>> {
+		private final MapFn<T, Object> mapFn;
+
+		public MapToAvroMap(MapFn<T, Object> mapFn) {
+			this.mapFn = mapFn;
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			mapFn.configure(conf);
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			mapFn.setConfigurationForTest(conf);
+		}
+
+		@Override
+		public void initialize() {
+			this.mapFn.setContext(getContext());
+		}
+
+		@Override
+		public Map<Utf8, Object> map(Map<String, T> input) {
+			Map<Utf8, Object> out = Maps.newHashMap();
+			for (Map.Entry<String, T> e : input.entrySet()) {
+				out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
+			}
+			return out;
+		}
+	}
+
+	public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
+		AvroType<T> avroType = (AvroType<T>) ptype;
+		Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
+		AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
+		MapToAvroMap<T> outputFn = new MapToAvroMap<T>(
+				avroType.getOutputMapFn());
+		return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
+	}
+
+	private static class GenericRecordToTuple extends
+			MapFn<GenericRecord, Tuple> {
+		private final TupleFactory<?> tupleFactory;
+		private final List<MapFn> fns;
+
+		private transient Object[] values;
+
+		public GenericRecordToTuple(TupleFactory<?> tupleFactory,
+				PType<?>... ptypes) {
+			this.tupleFactory = tupleFactory;
+			this.fns = Lists.newArrayList();
+			for (PType<?> ptype : ptypes) {
+				AvroType atype = (AvroType) ptype;
+				fns.add(atype.getInputMapFn());
+			}
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			for (MapFn fn : fns) {
+				fn.configure(conf);
+			}
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			for (MapFn fn : fns) {
+				fn.setConfigurationForTest(conf);
+			}
+		}
+
+		@Override
+		public void initialize() {
+			for (MapFn fn : fns) {
+				fn.setContext(getContext());
+			}
+			this.values = new Object[fns.size()];
+			tupleFactory.initialize();
+		}
+
+		@Override
+		public Tuple map(GenericRecord input) {
+			for (int i = 0; i < values.length; i++) {
+				Object v = input.get(i);
+				if (v == null) {
+					values[i] = null;
+				} else {
+					values[i] = fns.get(i).map(v);
+				}
+			}
+			return tupleFactory.makeTuple(values);
+		}
+	}
+
+	private static class TupleToGenericRecord extends
+			MapFn<Tuple, GenericRecord> {
+		private final List<MapFn> fns;
+		private final String jsonSchema;
+
+		private transient GenericRecord record;
+
+		public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
+			this.fns = Lists.newArrayList();
+			this.jsonSchema = schema.toString();
+			for (PType ptype : ptypes) {
+				AvroType atype = (AvroType) ptype;
+				fns.add(atype.getOutputMapFn());
+			}
+		}
+
+		@Override
+		public void configure(Configuration conf) {
+			for (MapFn fn : fns) {
+				fn.configure(conf);
+			}
+		}
+
+		@Override
+		public void setConfigurationForTest(Configuration conf) {
+			for (MapFn fn : fns) {
+				fn.setConfigurationForTest(conf);
+			}
+		}
+
+		@Override
+		public void initialize() {
+			this.record = new GenericData.Record(
+					new Schema.Parser().parse(jsonSchema));
+			for (MapFn fn : fns) {
+				fn.setContext(getContext());
+			}
+		}
+
+		@Override
+		public GenericRecord map(Tuple input) {
+			for (int i = 0; i < input.size(); i++) {
+				Object v = input.get(i);
+				if (v == null) {
+					record.put(i, null);
+				} else {
+					record.put(i, fns.get(i).map(v));
+				}
+			}
+			return record;
+		}
+	}
+
+	public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1,
+			PType<V2> p2) {
+		Schema schema = createTupleSchema(p1, p2);
+		GenericRecordToTuple input = new GenericRecordToTuple(
+				TupleFactory.PAIR, p1, p2);
+		TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
+		return new AvroType(Pair.class, schema, input, output, p1, p2);
+	}
+
+	public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(
+			PType<V1> p1, PType<V2> p2, PType<V3> p3) {
+		Schema schema = createTupleSchema(p1, p2, p3);
+		return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(
+				TupleFactory.TUPLE3, p1, p2, p3), new TupleToGenericRecord(
+				schema, p1, p2, p3), p1, p2, p3);
+	}
+
+	public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(
+			PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+		Schema schema = createTupleSchema(p1, p2, p3, p4);
+		return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(
+				TupleFactory.TUPLE4, p1, p2, p3, p4), new TupleToGenericRecord(
+				schema, p1, p2, p3, p4), p1, p2, p3, p4);
+	}
+
+	public static final AvroType<TupleN> tuples(PType... ptypes) {
+		Schema schema = createTupleSchema(ptypes);
+		return new AvroType(TupleN.class, schema, new GenericRecordToTuple(
+				TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
+				ptypes), ptypes);
+	}
+
+	public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz,
+			PType... ptypes) {
+		Schema schema = createTupleSchema(ptypes);
+		Class[] typeArgs = new Class[ptypes.length];
+		for (int i = 0; i < typeArgs.length; i++) {
+			typeArgs[i] = ptypes[i].getTypeClass();
+		}
+		TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
+		return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory,
+				ptypes), new TupleToGenericRecord(schema, ptypes), ptypes);
+	}
+
+	private static Schema createTupleSchema(PType<?>... ptypes) {
+		// Guarantee each tuple schema has a globally unique name
+		String tupleName = "tuple"
+				+ UUID.randomUUID().toString().replace('-', 'x');
+		Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+		List<Schema.Field> fields = Lists.newArrayList();
+		for (int i = 0; i < ptypes.length; i++) {
+			AvroType atype = (AvroType) ptypes[i];
+			Schema fieldSchema = allowNulls(atype.getSchema());
+			fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
+		}
+		schema.setFields(fields);
+		return schema;
+	}
+
+	public static final <S, T> AvroType<T> derived(Class<T> clazz,
+			MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
+		AvroType<S> abase = (AvroType<S>) base;
+		return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(
+				abase.getInputMapFn(), inputFn), new CompositeMapFn(outputFn,
+				abase.getOutputMapFn()), base.getSubTypes().toArray(
+				new PType[0]));
+	}
+
+	public static <T> PType<T> jsons(Class<T> clazz) {
+		return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
+	}
+
+	public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key,
+			PType<V> value) {
+		AvroType<K> avroKey = (AvroType<K>) key;
+		AvroType<V> avroValue = (AvroType<V>) value;
+		return new AvroTableType(avroKey, avroValue, Pair.class);
+	}
+
+	private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
+
+	private static Schema allowNulls(Schema base) {
+		if (NULL_SCHEMA.equals(base)) {
+			return base;
+		}
+		return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
+	}
+
+	private Avros() {
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
index 9739d17..390fbf1 100644
--- a/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
+++ b/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
@@ -14,15 +14,12 @@
  */
 package com.cloudera.crunch.io.avro;
 
-import static com.google.common.io.Resources.getResource;
-import static com.google.common.io.Resources.newInputStreamSupplier;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 
@@ -31,6 +28,7 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,17 +39,15 @@ import org.junit.Test;
 import com.cloudera.crunch.test.Person;
 import com.cloudera.crunch.types.avro.Avros;
 import com.google.common.collect.Lists;
-import com.google.common.io.InputSupplier;
 
 public class AvroFileReaderFactoryTest {
 
 	private File avroFile;
-	private Schema schema;
 
 	@Before
 	public void setUp() throws IOException {
-		InputSupplier<InputStream> inputStreamSupplier = newInputStreamSupplier(getResource("person.avro"));
-		schema = new Schema.Parser().parse(inputStreamSupplier.getInput());
+		// InputSupplier<InputStream> inputStreamSupplier =
+		// newInputStreamSupplier(getResource("person.avro"));
 		avroFile = File.createTempFile("test", ".av");
 	}
 
@@ -60,15 +56,15 @@ public class AvroFileReaderFactoryTest {
 		avroFile.delete();
 	}
 
-	private void populateGenericFile(List<GenericRecord> genericRecords)
-			throws IOException {
+	private void populateGenericFile(List<GenericRecord> genericRecords,
+			Schema outputSchema) throws IOException {
 		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
 		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
-				schema);
+				outputSchema);
 
 		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
 				genericDatumWriter);
-		dataFileWriter.create(schema, outputStream);
+		dataFileWriter.create(outputSchema, outputStream);
 
 		for (GenericRecord record : genericRecords) {
 			dataFileWriter.append(record);
@@ -81,17 +77,17 @@ public class AvroFileReaderFactoryTest {
 
 	@Test
 	public void testRead_GenericReader() throws IOException {
-		GenericRecord savedRecord = new GenericData.Record(schema);
+		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
 		savedRecord.put("name", "John Doe");
 		savedRecord.put("age", 42);
 		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord));
+		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
 
 		AvroFileReaderFactory<GenericData.Record> genericReader = new AvroFileReaderFactory<GenericData.Record>(
-				Avros.generics(schema), new Configuration());
+				Avros.generics(Person.SCHEMA$), new Configuration());
 		Iterator<GenericData.Record> recordIterator = genericReader.read(
-				FileSystem.getLocal(new Configuration()),
-				new Path(this.avroFile.getAbsolutePath()));
+				FileSystem.getLocal(new Configuration()), new Path(
+						this.avroFile.getAbsolutePath()));
 
 		GenericRecord genericRecord = recordIterator.next();
 		assertEquals(savedRecord, genericRecord);
@@ -100,16 +96,16 @@ public class AvroFileReaderFactoryTest {
 
 	@Test
 	public void testRead_SpecificReader() throws IOException {
-		GenericRecord savedRecord = new GenericData.Record(schema);
+		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
 		savedRecord.put("name", "John Doe");
 		savedRecord.put("age", 42);
 		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord));
+		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
 
 		AvroFileReaderFactory<Person> genericReader = new AvroFileReaderFactory<Person>(
 				Avros.records(Person.class), new Configuration());
-		Iterator<Person> recordIterator = genericReader.read(
-				FileSystem.getLocal(new Configuration()),
+		Iterator<Person> recordIterator = genericReader.read(FileSystem
+				.getLocal(new Configuration()),
 				new Path(this.avroFile.getAbsolutePath()));
 
 		Person expectedPerson = new Person();
@@ -125,4 +121,35 @@ public class AvroFileReaderFactoryTest {
 		assertEquals(expectedPerson, person);
 		assertFalse(recordIterator.hasNext());
 	}
+
+	@Test
+	public void testRead_ReflectReader() throws IOException {
+		Schema reflectSchema = ReflectData.get().getSchema(PojoPerson.class);
+		GenericRecord savedRecord = new GenericData.Record(reflectSchema);
+		savedRecord.put("name", "John Doe");
+		populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema);
+
+		AvroFileReaderFactory<PojoPerson> genericReader = new AvroFileReaderFactory<PojoPerson>(
+				Avros.reflects(PojoPerson.class), new Configuration());
+		Iterator<PojoPerson> recordIterator = genericReader.read(FileSystem
+				.getLocal(new Configuration()),
+				new Path(this.avroFile.getAbsolutePath()));
+
+		PojoPerson person = recordIterator.next();
+
+		assertEquals("John Doe", person.getName());
+		assertFalse(recordIterator.hasNext());
+	}
+
+	public static class PojoPerson {
+		private String name;
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
index afe22f9..ce43a98 100644
--- a/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
+++ b/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,6 +37,7 @@ import com.cloudera.crunch.PCollection;
 import com.cloudera.crunch.Pipeline;
 import com.cloudera.crunch.impl.mr.MRPipeline;
 import com.cloudera.crunch.io.At;
+import com.cloudera.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson;
 import com.cloudera.crunch.test.Person;
 import com.cloudera.crunch.types.avro.Avros;
 import com.google.common.collect.Lists;
@@ -55,14 +57,15 @@ public class AvroFileSourceTargetTest implements Serializable {
 		avroFile.delete();
 	}
 
-	private void populateGenericFile(List<GenericRecord> genericRecords) throws IOException {
+	private void populateGenericFile(List<GenericRecord> genericRecords,
+			Schema schema) throws IOException {
 		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
 		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
-				Person.SCHEMA$);
+				schema);
 
 		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
 				genericDatumWriter);
-		dataFileWriter.create(Person.SCHEMA$, outputStream);
+		dataFileWriter.create(schema, outputStream);
 
 		for (GenericRecord record : genericRecords) {
 			dataFileWriter.append(record);
@@ -79,13 +82,14 @@ public class AvroFileSourceTargetTest implements Serializable {
 		savedRecord.put("name", "John Doe");
 		savedRecord.put("age", 42);
 		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord));
+		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
 
 		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
-		PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
-				Avros.records(Person.class)));
+		PCollection<Person> genericCollection = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), Avros.records(Person.class)));
 
-		List<Person> personList = Lists.newArrayList(genericCollection.materialize());
+		List<Person> personList = Lists.newArrayList(genericCollection
+				.materialize());
 
 		Person expectedPerson = new Person();
 		expectedPerson.setName("John Doe");
@@ -96,25 +100,51 @@ public class AvroFileSourceTargetTest implements Serializable {
 		siblingNames.add("Jane");
 		expectedPerson.setSiblingnames(siblingNames);
 
-		assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+		assertEquals(Lists.newArrayList(expectedPerson),
+				Lists.newArrayList(personList));
 	}
 
 	@Test
 	public void testGeneric() throws IOException {
-		String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
-		Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+		String genericSchemaJson = Person.SCHEMA$.toString().replace("Person",
+				"GenericPerson");
+		Schema genericPersonSchema = new Schema.Parser()
+				.parse(genericSchemaJson);
 		GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
 		savedRecord.put("name", "John Doe");
 		savedRecord.put("age", 42);
 		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord));
+		populateGenericFile(Lists.newArrayList(savedRecord),
+				genericPersonSchema);
 
 		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
-		PCollection<Record> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
-				Avros.generics(genericPersonSchema)));
+		PCollection<Record> genericCollection = pipeline
+				.read(At.avroFile(avroFile.getAbsolutePath(),
+						Avros.generics(genericPersonSchema)));
 
-		List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
+		List<Record> recordList = Lists.newArrayList(genericCollection
+				.materialize());
 
-		assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
+		assertEquals(Lists.newArrayList(savedRecord),
+				Lists.newArrayList(recordList));
+	}
+
+	@Test
+	public void testReflect() throws IOException {
+		Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class);
+		GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
+		savedRecord.put("name", "John Doe");
+		populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
+
+		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
+		PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), Avros.reflects(PojoPerson.class)));
+
+		List<PojoPerson> recordList = Lists.newArrayList(personCollection
+				.materialize());
+
+		assertEquals(1, recordList.size());
+		PojoPerson person = recordList.get(0);
+		assertEquals("John Doe", person.getName());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
new file mode 100644
index 0000000..b0e2e87
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
@@ -0,0 +1,98 @@
+package com.cloudera.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.MapFn;
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.test.FileHelper;
+import com.cloudera.crunch.types.avro.Avros;
+import com.google.common.collect.Lists;
+
+public class AvroReflectTest implements Serializable {
+
+	static class StringWrapper {
+		private String value;
+
+		public StringWrapper() {
+			this(null);
+		}
+
+		public StringWrapper(String value) {
+			this.value = value;
+		}
+
+		public String getValue() {
+			return value;
+		}
+
+		public void setValue(String value) {
+			this.value = value;
+		}
+
+		@Override
+		public String toString() {
+			return String.format("<StringWrapper(%s)>", value);
+		}
+
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + ((value == null) ? 0 : value.hashCode());
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			StringWrapper other = (StringWrapper) obj;
+			if (value == null) {
+				if (other.value != null)
+					return false;
+			} else if (!value.equals(other.value))
+				return false;
+			return true;
+		}
+
+	}
+
+	@Test
+	public void testReflection() throws IOException {
+		Pipeline pipeline = new MRPipeline(AvroReflectTest.class);
+		PCollection<StringWrapper> stringWrapperCollection = pipeline
+				.readTextFile(FileHelper.createTempCopyOf("set1.txt"))
+				.parallelDo(new MapFn<String, StringWrapper>() {
+
+					@Override
+					public StringWrapper map(String input) {
+						StringWrapper stringWrapper = new StringWrapper();
+						stringWrapper.setValue(input);
+						return stringWrapper;
+					}
+				}, Avros.reflects(StringWrapper.class));
+
+		List<StringWrapper> stringWrappers = Lists
+				.newArrayList(stringWrapperCollection.materialize());
+
+		pipeline.done();
+
+		assertEquals(Lists.newArrayList(new StringWrapper("b"),
+				new StringWrapper("c"), new StringWrapper("a"),
+				new StringWrapper("e")), stringWrappers);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
index acfb3e6..d2c2ab9 100644
--- a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
+++ b/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
@@ -15,23 +15,52 @@ public class AvroTypeTest {
 	}
 
 	@Test
+	public void testIsGeneric_SpecificData() {
+		assertFalse(Avros.records(Person.class).isGeneric());
+	}
+
+	@Test
 	public void testIsSpecific_GenericData() {
 		assertFalse(Avros.generics(Person.SCHEMA$).isSpecific());
 	}
 
 	@Test
+	public void testIsGeneric_GenericData() {
+		assertTrue(Avros.generics(Person.SCHEMA$).isGeneric());
+	}
+
+	@Test
 	public void testIsSpecific_NonAvroClass() {
 		assertFalse(Avros.ints().isSpecific());
 	}
-		 
+
+	@Test
+	public void testIsGeneric_NonAvroClass() {
+		assertFalse(Avros.ints().isGeneric());
+	}
+
 	@Test
 	public void testIsSpecific_SpecificAvroTable() {
-		assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isSpecific());
+		assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class))
+				.isSpecific());
+	}
+
+	@Test
+	public void testIsGeneric_SpecificAvroTable() {
+		assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class))
+				.isGeneric());
 	}
-	
+
 	@Test
 	public void testIsSpecific_GenericAvroTable() {
-		assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isSpecific());
+		assertFalse(Avros.tableOf(Avros.strings(),
+				Avros.generics(Person.SCHEMA$)).isSpecific());
+	}
+
+	@Test
+	public void testIsGeneric_GenericAvroTable() {
+		assertTrue(Avros.tableOf(Avros.strings(),
+				Avros.generics(Person.SCHEMA$)).isGeneric());
 	}
 
 }