You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2013/11/20 20:53:44 UTC

git commit: CRUNCH-293: Add AvroMode to inject avro readers.

Updated Branches:
  refs/heads/master a724ddce5 -> 1381165fb


CRUNCH-293: Add AvroMode to inject avro readers.

AvroMode is an enum that is used to construct DatumReaders and
DatumWriters. The default generic, specific, and reflect implementations
returned can be overridden cleanly.

Signed-off-by: Micah Whitacre <mi...@cerner.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1381165f
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1381165f
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1381165f

Branch: refs/heads/master
Commit: 1381165fb8f6a51b18b78d6bd955a4aa15bfb824
Parents: a724ddc
Author: Ryan Blue <rb...@cloudera.com>
Authored: Fri Nov 15 14:37:52 2013 -0800
Committer: Micah Whitacre <mi...@cerner.com>
Committed: Wed Nov 20 13:26:36 2013 -0600

----------------------------------------------------------------------
 .../org/apache/crunch/impl/mem/MemPipeline.java |   7 +-
 .../crunch/io/avro/AvroFileReaderFactory.java   |  26 +--
 .../apache/crunch/io/avro/AvroFileSource.java   |   2 +
 .../apache/crunch/io/avro/AvroFileTarget.java   |   5 +-
 .../crunch/types/avro/AvroDeepCopier.java       |   5 +-
 .../crunch/types/avro/AvroGroupedTableType.java |   2 +-
 .../org/apache/crunch/types/avro/AvroMode.java  | 162 +++++++++++++++++++
 .../crunch/types/avro/AvroOutputFormat.java     |   6 +-
 .../crunch/types/avro/AvroRecordReader.java     |  14 +-
 .../org/apache/crunch/types/avro/Avros.java     |  54 +++++--
 .../crunch/types/avro/ReaderWriterFactory.java  |  37 +++++
 .../crunch/types/avro/ReflectDataFactory.java   |  12 +-
 .../types/avro/SafeAvroSerialization.java       |  15 +-
 13 files changed, 279 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 5c0f6b0..362763b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.AbstractFuture;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericContainer;
-import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
@@ -51,6 +51,7 @@ import org.apache.crunch.io.seq.SeqFileTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.avro.ReflectDataFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -241,9 +242,9 @@ public class MemPipeline implements Pipeline {
       schema = new ReflectDataFactory().getReflectData().getSchema(r.getClass());
     }
 
-    GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
+    DatumWriter datumWriter = Avros.newWriter(schema);
 
-    DataFileWriter dataFileWriter = new DataFileWriter(genericDatumWriter);
+    DataFileWriter dataFileWriter = new DataFileWriter(datumWriter);
     dataFileWriter.create(schema, outputStream);
 
     for (Object record : genericRecords) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index efde4ec..5f53a36 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -22,12 +22,8 @@ import java.util.Iterator;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.MapFn;
@@ -59,30 +55,12 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
   }
 
   public AvroFileReaderFactory(Schema schema) {
-    this.recordReader = new GenericDatumReader<T>(schema);
+    this.recordReader = Avros.newReader(schema);
     this.mapFn = IdentityFn.<T>getInstance();
   }
 
   static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
-    if (avroType.hasReflect()) {
-      if (avroType.hasSpecific()) {
-        Avros.checkCombiningSpecificAndReflectionSchemas();
-      }
-      return new ReflectDatumReader<T>(avroType.getSchema());
-    } else if (avroType.hasSpecific()) {
-      // Use the classloader of the avro type as the classloader
-      // for loading SpecificData classes. This is a best-effort
-      // approach to avoid running into AVRO-1240. We can't just
-      // use the SpecificDatumReader(Class) constructor because the
-      // type class here isn't necessarily a SpecificData class, it
-      // might just contain one as a subtype.
-      return new SpecificDatumReader<T>(
-                        avroType.getSchema(),
-                        avroType.getSchema(),
-                        new SpecificData(avroType.getClass().getClassLoader()));
-    } else {
-      return new GenericDatumReader<T>(avroType.getSchema());
-    }
+    return Avros.newReader(avroType);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index a961016..17f47d7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -27,6 +27,7 @@ import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.avro.AvroInputFormat;
+import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour
         .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
         .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
         .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
+    AvroMode.fromType(ptype).configure(bundle);
     return bundle;
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index fc82361..68b3b01 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -25,10 +25,9 @@ import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.avro.AvroOutputFormat;
 import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
@@ -72,7 +71,7 @@ public class AvroFileTarget extends FileTargetImpl {
       schemaParam = "avro.output.schema." + name;
     }
     bundle.set(schemaParam, atype.getSchema().toString());
-    Avros.configureReflectDataFactory(job.getConfiguration());
+    AvroMode.fromType(atype).configure(bundle);
     configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle,
         outputPath, name);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 367bc9f..9e4b0a1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -152,12 +152,13 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
 
     @Override
     protected DatumReader<T> createDatumReader(Configuration conf) {
-      return Avros.getReflectDataFactory(conf).getReader(getSchema());
+      AvroMode.REFLECT.configure(conf);
+      return AvroMode.REFLECT.getReader(getSchema());
     }
 
     @Override
     protected DatumWriter<T> createDatumWriter(Configuration conf) {
-      return Avros.getReflectDataFactory(conf).getWriter(getSchema());
+      return AvroMode.fromConfiguration(conf).getWriter(getSchema());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index 598868f..62e6db4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
       options.configure(job);
     }
 
-    Avros.configureReflectDataFactory(conf);
+    AvroMode.fromType(att).configure(conf);
 
     Collection<String> serializations = job.getConfiguration().getStringCollection(
         "io.serializations");

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
new file mode 100644
index 0000000..77eece1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public enum AvroMode implements ReaderWriterFactory {
+  REFLECT (new ReflectDataFactory(), Avros.REFLECT_DATA_FACTORY_CLASS),
+  SPECIFIC ("crunch.specificfactory"),
+  GENERIC ("crunch.genericfactory");
+
+  public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode";
+
+  public static AvroMode fromConfiguration(Configuration conf) {
+    AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, GENERIC);
+    mode.setFromConfiguration(conf);
+    return mode;
+  }
+
+  public static AvroMode fromType(AvroType<?> type) {
+    if (type.hasReflect()) {
+      if (type.hasSpecific()) {
+        Avros.checkCombiningSpecificAndReflectionSchemas();
+      }
+      return AvroMode.REFLECT;
+    } else if (type.hasSpecific()) {
+      return AvroMode.SPECIFIC;
+    } else {
+      return AvroMode.GENERIC;
+    }
+  }
+
+  private static ClassLoader specificLoader = null;
+
+  public static void setSpecificClassLoader(ClassLoader loader) {
+    specificLoader = loader;
+  }
+
+  // the factory methods in this class may be overridden in ReaderWriterFactory
+  ReaderWriterFactory factory;
+
+  private final String propName;
+
+  private AvroMode(ReaderWriterFactory factory, String propName) {
+    this.factory = factory;
+    this.propName = propName;
+  }
+
+  private AvroMode(String propName) {
+    this.factory = null;
+    this.propName = propName;
+  }
+
+  public GenericData getData() {
+    if (factory != null) {
+      return factory.getData();
+    }
+
+    switch(this) {
+      case REFLECT:
+        return ReflectData.AllowNull.get();
+      case SPECIFIC:
+        return SpecificData.get();
+      default:
+        return GenericData.get();
+    }
+  }
+
+  public <T> DatumReader<T> getReader(Schema schema) {
+    if (factory != null) {
+      return factory.getReader(schema);
+    }
+
+    switch (this) {
+      case REFLECT:
+        return new ReflectDatumReader<T>(schema);
+      case SPECIFIC:
+        if (specificLoader != null) {
+          return new SpecificDatumReader<T>(
+              schema, schema, new SpecificData(specificLoader));
+        } else {
+          return new SpecificDatumReader<T>(schema);
+        }
+      default:
+        return new GenericDatumReader<T>(schema);
+    }
+  }
+
+  public <T> DatumWriter<T> getWriter(Schema schema) {
+    if (factory != null) {
+      return factory.getWriter(schema);
+    }
+
+    switch (this) {
+      case REFLECT:
+        return new ReflectDatumWriter<T>(schema);
+      case SPECIFIC:
+        return new SpecificDatumWriter<T>(schema);
+      default:
+        return new GenericDatumWriter<T>(schema);
+    }
+  }
+
+  public void override(ReaderWriterFactory factory) {
+    if (factory != this) {
+      this.factory = factory;
+    }
+  }
+
+  public void configure(Configuration conf) {
+    conf.setEnum(AVRO_MODE_PROPERTY, this);
+    if (factory != null) {
+      conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
+    }
+  }
+
+  public void configure(FormatBundle bundle) {
+    bundle.set(AVRO_MODE_PROPERTY, this.toString());
+    if (factory != null) {
+      bundle.set(propName, factory.getClass().getName());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  void setFromConfiguration(Configuration conf) {
+    Class<?> factoryClass = conf.getClass(propName, this.getClass());
+    if (factoryClass != this.getClass()) {
+      this.factory = (ReaderWriterFactory)
+          ReflectionUtils.newInstance(factoryClass, conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 98d3f50..526dabb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -48,8 +48,8 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
       schema = AvroJob.getOutputSchema(context.getConfiguration());
     }
 
-    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter(schema));
+    final DataFileWriter<T> WRITER = new DataFileWriter<T>(
+        AvroMode.fromConfiguration(conf).<T>getWriter(schema));
 
     JobConf jc = new JobConf(conf);
     /* copied from org.apache.avro.mapred.AvroOutputFormat */
@@ -84,4 +84,4 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
     };
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
index 9c7578c..a5339fb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
@@ -24,10 +24,8 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.AvroJob;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.avro.mapred.FsInput;
-import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -54,13 +52,9 @@ class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
     FileSplit split = (FileSplit) genericSplit;
     Configuration conf = context.getConfiguration();
     SeekableInput in = new FsInput(split.getPath(), conf);
-    DatumReader<T> datumReader = null;
-    if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) {
-      ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-      datumReader = factory.getReader(schema);
-    } else {
-      datumReader = new SpecificDatumReader<T>(schema);
-    }
+    DatumReader<T> datumReader = AvroMode
+        .fromConfiguration(context.getConfiguration())
+        .getReader(schema);
     this.reader = DataFileReader.openReader(in, datumReader);
     reader.sync(split.getStart()); // sync to start
     this.start = reader.tell();
@@ -111,4 +105,4 @@ class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
   public void close() throws IOException {
     reader.close();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 8b31f39..8d6bdd6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -28,14 +28,14 @@ import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
@@ -90,24 +90,41 @@ public class Avros {
   }
 
   /**
-   * The instance we use for generating reflected schemas. May be modified by
-   * clients (e.g., Scrunch.)
+   * The instance we use for generating reflected schemas. In releases up to
+   * 0.8.0, this may be modified by clients (e.g., Scrunch.) to override the
+   * reader, writer, and data instances used.
+   *
+   * Configuring the ReaderWriterFactory by setting this field is deprecated.
+   * Instead, use {@link AvroMode#override(ReaderWriterFactory)}.
+   *
+   * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory)
    */
-  public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
+  public static ReflectDataFactory REFLECT_DATA_FACTORY =
+      (ReflectDataFactory) AvroMode.REFLECT.factory;
 
   /**
    * The name of the configuration parameter that tracks which reflection
    * factory to use.
+   *
+   * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory)
    */
   public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
 
+  /**
+   * @deprecated as of 0.9.0; use AvroMode.REFLECT.configure(Configuration)
+   */
+  @Deprecated
   public static void configureReflectDataFactory(Configuration conf) {
-    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
+    AvroMode.REFLECT.override(REFLECT_DATA_FACTORY);
+    AvroMode.REFLECT.configure(conf);
   }
 
+  /**
+   * @deprecated as of 0.9.0; use AvroMode.fromConfiguration(conf)
+   */
   public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
-    return (ReflectDataFactory) ReflectionUtils.newInstance(
-        conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
+    AvroMode.REFLECT.setFromConfiguration(conf);
+    return (ReflectDataFactory) AvroMode.REFLECT.factory;
   }
 
   public static void checkCombiningSpecificAndReflectionSchemas() {
@@ -117,10 +134,27 @@ public class Avros {
           + " Please consider turning your reflection-based type into an avro-generated"
           + " type and using that generated type instead."
           + " If the version of Avro you are using is 1.7.0 or greater, you can enable"
-          + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS" + " field to 'true'.");
+          + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS"
+          + " field to 'true'.");
     }
   }
 
+  public static <T> DatumReader<T> newReader(Schema schema) {
+    return AvroMode.GENERIC.getReader(schema);
+  }
+
+  public static <T> DatumReader<T> newReader(AvroType<T> type) {
+    return AvroMode.fromType(type).getReader(type.getSchema());
+  }
+
+  public static <T> DatumWriter<T> newWriter(Schema schema) {
+    return AvroMode.GENERIC.getWriter(schema);
+  }
+
+  public static <T> DatumWriter<T> newWriter(AvroType<T> type) {
+    return AvroMode.fromType(type).getWriter(type.getSchema());
+  }
+
   public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
     @Override
     public String map(CharSequence input) {
@@ -236,7 +270,7 @@ public class Avros {
   }
 
   public static final <T> AvroType<T> reflects(Class<T> clazz) {
-    Schema schema = REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz);
+    Schema schema = REFLECT_DATA_FACTORY.getData().getSchema(clazz);
     return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/ReaderWriterFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReaderWriterFactory.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReaderWriterFactory.java
new file mode 100644
index 0000000..7f81f9f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReaderWriterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+
+/**
+ * Interface for accessing DatumReader, DatumWriter, and Data classes.
+ */
+public interface ReaderWriterFactory {
+
+  GenericData getData();
+
+  <D> DatumReader<D> getReader(Schema schema);
+
+  <D> DatumWriter<D> getWriter(Schema schema);
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
index e973cca..3a5d6f4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
@@ -25,16 +25,24 @@ import org.apache.avro.reflect.ReflectDatumWriter;
 /**
  * A Factory class for constructing Avro reflection-related objects.
  */
-public class ReflectDataFactory {
+public class ReflectDataFactory implements ReaderWriterFactory {
 
-  public ReflectData getReflectData() {
+  @Override
+  public ReflectData getData() {
     return ReflectData.AllowNull.get();
   }
 
+  // for backwards-compatibility
+  public ReflectData getReflectData() {
+    return getData();
+  }
+
+  @Override
   public <T> ReflectDatumReader<T> getReader(Schema schema) {
     return new ReflectDatumReader<T>(schema);
   }
 
+  @Override
   public <T> ReflectDatumWriter<T> getWriter(Schema schema) {
     return new ReflectDatumWriter<T>(schema);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1381165f/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
index 8bd18b0..7e323f1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -33,14 +33,11 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.avro.mapred.Pair;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
 class SafeAvroSerialization<T> extends Configured implements Serialization<AvroWrapper<T>> {
@@ -61,11 +58,9 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
 
     DatumReader<T> datumReader = null;
     if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
-      ReflectDataFactory factory = (ReflectDataFactory) ReflectionUtils.newInstance(
-          conf.getClass("crunch.reflectdatafactory", ReflectDataFactory.class), conf);
-      datumReader = factory.getReader(schema);
+      datumReader = AvroMode.REFLECT.getReader(schema);
     } else {
-      datumReader = new SpecificDatumReader<T>(schema);
+      datumReader = AvroMode.fromConfiguration(conf).getReader(schema);
     }
     return new AvroWrapperDeserializer(datumReader, isKey);
   }
@@ -110,8 +105,8 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
     Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair
         .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
 
-    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    ReflectDatumWriter<T> writer = factory.getWriter(schema);
+    ReaderWriterFactory factory = AvroMode.fromConfiguration(conf);
+    DatumWriter<T> writer = factory.getWriter(schema);
     return new AvroWrapperSerializer(writer);
   }
 
@@ -142,4 +137,4 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
     }
   }
 
-}
\ No newline at end of file
+}