You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/02 04:13:36 UTC

git commit: CRUNCH-401: Enable overrides of AvroMode.REFLECT for use with Scrunch.

Repository: crunch
Updated Branches:
  refs/heads/master fcb861edc -> 583ea6dab


CRUNCH-401: Enable overrides of AvroMode.REFLECT for use with Scrunch.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/583ea6da
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/583ea6da
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/583ea6da

Branch: refs/heads/master
Commit: 583ea6dabc457ec69f316b52548b1dc92f99dda1
Parents: fcb861e
Author: Josh Wills <jw...@apache.org>
Authored: Fri May 23 07:05:38 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Jun 1 18:52:42 2014 -0700

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileReaderFactory.java   | 26 ++++++-----
 .../lib/join/BloomFilterJoinStrategy.java       |  8 ++--
 .../org/apache/crunch/lib/join/JoinUtils.java   |  5 ++-
 .../crunch/lib/sort/ReverseAvroComparator.java  |  5 ++-
 .../crunch/types/avro/AvroDeepCopier.java       |  3 +-
 .../crunch/types/avro/AvroGroupedTableType.java |  2 +-
 .../org/apache/crunch/types/avro/AvroMode.java  | 12 ++---
 .../crunch/types/avro/AvroOutputFormat.java     |  1 +
 .../org/apache/crunch/types/avro/Avros.java     | 46 +++++++++++++-------
 .../crunch/types/avro/ReflectDataFactory.java   |  5 ---
 .../apache/crunch/scrunch/DeepCopyTest.scala    |  2 +-
 .../crunch/scrunch/PageRankClassTest.scala      |  8 ++--
 .../crunch/scrunch/ScalaReflectDataFactory.java | 12 ++---
 .../crunch/scrunch/ScalaSafeReflectData.java    | 21 +++++++--
 .../scrunch/ScalaSafeReflectDatumReader.java    | 30 +++++++------
 .../scrunch/ScalaSafeReflectDatumWriter.java    |  7 +--
 .../org/apache/crunch/scrunch/PCollection.scala | 12 +++--
 .../apache/crunch/scrunch/PCollectionLike.scala |  6 +++
 .../org/apache/crunch/scrunch/PTable.scala      |  5 ++-
 .../org/apache/crunch/scrunch/PTypeFamily.scala |  6 ++-
 .../org/apache/crunch/scrunch/Pipeline.scala    |  1 +
 .../apache/crunch/scrunch/PipelineLike.scala    | 20 +++++++--
 22 files changed, 158 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index 5f53a36..5128fd6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,29 +43,32 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
 
   private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
 
-  private final DatumReader<T> recordReader;
+  private DatumReader<T> reader;
+  private final AvroType<?> atype;
   private final MapFn<T, T> mapFn;
 
-  public AvroFileReaderFactory(AvroType<T> atype) {
-    this(createDatumReader(atype), atype);
+  public AvroFileReaderFactory(Schema schema) {
+    this(null, Avros.generics(schema));
   }
 
-  public AvroFileReaderFactory(DatumReader<T> reader, AvroType<T> atype) {
-    this.recordReader = reader != null ? reader : createDatumReader(atype);
-    this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+  public AvroFileReaderFactory(AvroType<?> atype) {
+    this(null, atype);
   }
 
-  public AvroFileReaderFactory(Schema schema) {
-    this.recordReader = Avros.newReader(schema);
-    this.mapFn = IdentityFn.<T>getInstance();
+  public AvroFileReaderFactory(DatumReader<T> reader, AvroType<?> atype) {
+    this.reader = reader;
+    this.atype = atype;
+    this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
   }
 
-  static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
-    return Avros.newReader(avroType);
+  static <T> DatumReader<T> createDatumReader(AvroType<T> atype) {
+    return Avros.newReader(atype);
   }
 
   @Override
   public Iterator<T> read(FileSystem fs, final Path path) {
+    AvroMode mode = AvroMode.fromType(atype).withFactoryFromConfiguration(fs.getConf());
+    final DatumReader recordReader = reader == null ? mode.getReader(atype.getSchema()) : reader;
     this.mapFn.initialize();
     try {
       FsInput fsi = new FsInput(path, fs.getConf());

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
index 872f3e3..69fe27e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.commons.logging.Log;
@@ -35,9 +36,9 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.ReadableData;
-import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
@@ -304,11 +305,12 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
     
     private AvroType<T> ptype;
     private BinaryEncoder encoder;
-    private ReflectDatumWriter datumWriter;
+    private DatumWriter datumWriter;
     
     AvroToBytesFn(AvroType<T> ptype, Configuration conf) {
       this.ptype = ptype;
-      datumWriter = Avros.getReflectDataFactory(conf).getWriter(ptype.getSchema());
+      datumWriter = AvroMode.fromType(ptype).withFactoryFromConfiguration(conf)
+          .getWriter(ptype.getSchema());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
index b4829be..02963a7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -26,6 +26,7 @@ import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.writable.TupleWritable;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.hadoop.conf.Configuration;
@@ -108,6 +109,7 @@ public class JoinUtils {
 
   public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
     private Schema schema;
+    private AvroMode mode;
 
     @Override
     public void setConf(Configuration conf) {
@@ -116,12 +118,13 @@ public class JoinUtils {
         Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
         Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
         schema = keySchema.getFields().get(0).schema();
+        mode = AvroMode.fromShuffleConfiguration(conf);
       }
     }
 
     @Override
     public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
-      return ReflectData.get().compare(x.datum(), y.datum(), schema);
+      return mode.getData().compare(x.datum(), y.datum(), schema);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java
index c404492..b94ccc2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java
@@ -21,6 +21,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.types.avro.AvroMode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.RawComparator;
@@ -28,18 +29,20 @@ import org.apache.hadoop.io.RawComparator;
 public class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
 
   private Schema schema;
+  private AvroMode mode;
 
   @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
     if (conf != null) {
       schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+      mode = AvroMode.fromShuffleConfiguration(conf);
     }
   }
 
   @Override
   public int compare(AvroKey<T> o1, AvroKey<T> o2) {
-    return ReflectData.get().compare(o2.datum(), o1.datum(), schema);
+    return mode.getData().compare(o2.datum(), o1.datum(), schema);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 56ec459..4a98228 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -153,8 +153,7 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
 
     @Override
     protected DatumReader<T> createDatumReader(Configuration conf) {
-      AvroMode.REFLECT.configureFactory(conf);
-      return AvroMode.REFLECT.getReader(getSchema());
+      return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(getSchema());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index a97f917..7178274 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
       options.configure(job);
     }
 
-    AvroMode.fromType(att).configureShuffle(conf);
+    AvroMode.fromType(att).withFactoryFromConfiguration(conf).configureShuffle(conf);
 
     Collection<String> serializations = job.getConfiguration().getStringCollection(
         "io.serializations");

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
index 90ff791..3d7fbfa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
@@ -51,12 +51,12 @@ public class AvroMode implements ReaderWriterFactory {
   /**
    * Default mode to use for reading and writing {@link ReflectData Reflect} types.
    */
-  public static final AvroMode REFLECT  = new AvroMode(ModeType.REFLECT, new ReflectDataFactory(), Avros.REFLECT_DATA_FACTORY_CLASS);
+  public static final AvroMode REFLECT = new AvroMode(ModeType.REFLECT, Avros.REFLECT_DATA_FACTORY_CLASS);
 
   /**
    * Default mode to use for reading and writing {@link SpecificData Specific} types.
    */
-  public static final AvroMode SPECIFIC =new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory");
+  public static final AvroMode SPECIFIC = new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory");
   /**
    * Default mode to use for reading and writing {@link GenericData Generic} types.
    */
@@ -95,11 +95,11 @@ public class AvroMode implements ReaderWriterFactory {
       if (type.hasSpecific()) {
         Avros.checkCombiningSpecificAndReflectionSchemas();
       }
-      return AvroMode.REFLECT;
+      return REFLECT;
     } else if (type.hasSpecific()) {
-      return AvroMode.SPECIFIC;
+      return SPECIFIC;
     } else {
-      return AvroMode.GENERIC;
+      return GENERIC;
     }
   }
 
@@ -352,7 +352,7 @@ public class AvroMode implements ReaderWriterFactory {
   }
 
   @SuppressWarnings("unchecked")
-  AvroMode withFactoryFromConfiguration(Configuration conf) {
+  public AvroMode withFactoryFromConfiguration(Configuration conf) {
     // although the shuffle and input/output use different properties for mode,
     // this is shared - only one ReaderWriterFactory can be used.
     Class<?> factoryClass = conf.getClass(propName, this.getClass());

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 5a4499d..6dbb6de 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.mapred.AvroJob;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 1fcb30e..62945b1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -100,14 +100,11 @@ public class Avros {
    *
    * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory)
    */
-  public static ReflectDataFactory REFLECT_DATA_FACTORY =
-      (ReflectDataFactory) AvroMode.REFLECT.getFactory();
+  public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
 
   /**
    * The name of the configuration parameter that tracks which reflection
    * factory to use.
-   *
-   * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory)
    */
   public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
 
@@ -273,7 +270,11 @@ public class Avros {
   }
 
   public static final <T> AvroType<T> reflects(Class<T> clazz) {
-    Schema schema = REFLECT_DATA_FACTORY.getData().getSchema(clazz);
+    Schema schema = ((ReflectData) AvroMode.REFLECT.getData()).getSchema(clazz);
+    return reflects(clazz, schema);
+  }
+
+  public static final <T> AvroType<T> reflects(Class<T> clazz, Schema schema) {
     return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
   }
 
@@ -541,6 +542,7 @@ public class Avros {
     private final String jsonSchema;
     private final boolean isReflect;
     private transient Schema schema;
+    private transient AvroMode mode;
 
     public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
       this.fns = Lists.newArrayList();
@@ -585,11 +587,16 @@ public class Avros {
       for (MapFn fn : fns) {
         fn.initialize();
       }
+      if (getConfiguration() != null) {
+        mode = AvroMode.REFLECT.withFactoryFromConfiguration(getConfiguration());
+      } else {
+        mode = AvroMode.REFLECT;
+      }
     }
 
     private GenericRecord createRecord() {
       if (isReflect) {
-        return new ReflectGenericRecord(schema);
+        return new ReflectGenericRecord(schema, mode);
       } else {
         return new GenericData.Record(schema);
       }
@@ -693,6 +700,7 @@ public class Avros {
     private final String jsonSchema;
     private final boolean isReflect;
     private transient Schema schema;
+    private transient AvroMode mode;
 
     public TupleToUnionRecord(Schema schema, PType<?>... ptypes) {
       this.fns = Lists.newArrayList();
@@ -737,11 +745,16 @@ public class Avros {
       for (MapFn fn : fns) {
         fn.initialize();
       }
+      if (getConfiguration() != null) {
+        mode = AvroMode.REFLECT.withFactoryFromConfiguration(getConfiguration());
+      } else {
+        mode = AvroMode.REFLECT;
+      }
     }
 
     private GenericRecord createRecord() {
       if (isReflect) {
-        return new ReflectGenericRecord(schema);
+        return new ReflectGenericRecord(schema, mode);
       } else {
         return new GenericData.Record(schema);
       }
@@ -850,20 +863,23 @@ public class Avros {
 
   private static class ReflectGenericRecord extends GenericData.Record {
 
-    public ReflectGenericRecord(Schema schema) {
+    private AvroMode mode;
+
+    public ReflectGenericRecord(Schema schema, AvroMode mode) {
       super(schema);
+      this.mode = mode;
     }
 
     @Override
     public int hashCode() {
-      return reflectAwareHashCode(this, getSchema());
+      return reflectAwareHashCode(this, getSchema(), mode);
     }
   }
 
   /*
    * TODO: Remove this once we no longer have to support 1.5.4.
    */
-  private static int reflectAwareHashCode(Object o, Schema s) {
+  private static int reflectAwareHashCode(Object o, Schema s, AvroMode mode) {
     if (o == null)
       return 0; // incomplete datum
     int hashCode = 1;
@@ -872,17 +888,17 @@ public class Avros {
       for (Schema.Field f : s.getFields()) {
         if (f.order() == Schema.Field.Order.IGNORE)
           continue;
-        hashCode = hashCodeAdd(hashCode, ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
+        hashCode = hashCodeAdd(hashCode, mode.getData().getField(o, f.name(), f.pos()), f.schema(), mode);
       }
       return hashCode;
     case ARRAY:
       Collection<?> a = (Collection<?>) o;
       Schema elementType = s.getElementType();
       for (Object e : a)
-        hashCode = hashCodeAdd(hashCode, e, elementType);
+        hashCode = hashCodeAdd(hashCode, e, elementType, mode);
       return hashCode;
     case UNION:
-      return reflectAwareHashCode(o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
+      return reflectAwareHashCode(o, s.getTypes().get(mode.getData().resolveUnion(s, o)), mode);
     case ENUM:
       return s.getEnumOrdinal(o.toString());
     case NULL:
@@ -895,8 +911,8 @@ public class Avros {
   }
 
   /** Add the hash code for an object into an accumulated hash code. */
-  private static int hashCodeAdd(int hashCode, Object o, Schema s) {
-    return 31 * hashCode + reflectAwareHashCode(o, s);
+  private static int hashCodeAdd(int hashCode, Object o, Schema s, AvroMode mode) {
+    return 31 * hashCode + reflectAwareHashCode(o, s, mode);
   }
 
   private Avros() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
index 3a5d6f4..65c1f49 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
@@ -32,11 +32,6 @@ public class ReflectDataFactory implements ReaderWriterFactory {
     return ReflectData.AllowNull.get();
   }
 
-  // for backwards-compatibility
-  public ReflectData getReflectData() {
-    return getData();
-  }
-
   @Override
   public <T> ReflectDatumReader<T> getReader(Schema schema) {
     return new ReflectDatumReader<T>(schema);

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
index c931754..816993b 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -73,7 +73,7 @@ class DeepCopyTest extends CrunchSuite {
   @SuppressWarnings(Array("rawtypes", "unchecked"))
   private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
     val r: AnyRef = records.iterator.next()
-    val schema = new ScalaReflectDataFactory().getReflectData.getSchema(r.getClass)
+    val schema = new ScalaReflectDataFactory().getData.getSchema(r.getClass)
 
     val writer = new ScalaReflectDataFactory().getWriter[T](schema)
     val dataFileWriter = new DataFileWriter(writer)

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
index 3d3cb9f..f7ccf1a 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
@@ -27,12 +27,12 @@ import scala.collection.mutable.HashMap
 import _root_.org.junit.Assert._
 import _root_.org.junit.Test
 
-case class PageRankData(page_rank: Float, oldpr: Float, urls: Array[String]) {
-  def this() = this(0f, 0f, null)
+case class PageRankData(page_rank: Float, oldpr: Float, urls: Array[String], bytes: Array[Byte]) {
+  def this() = this(0f, 0f, null, Array[Byte](0))
 
   def scaledPageRank = page_rank / urls.length
 
-  def next(newPageRank: Float) = new PageRankData(newPageRank, page_rank, urls)
+  def next(newPageRank: Float) = new PageRankData(newPageRank, page_rank, urls, bytes)
 
   def delta = math.abs(page_rank - oldpr)
 }
@@ -67,7 +67,7 @@ class PageRankClassTest extends CrunchSuite {
     pipeline.read(from.textFile(fileName, Avros.strings))
       .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
       .groupByKey
-      .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray)))
+      .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray, Array[Byte](0))))
   }
 
   def update(prev: PTable[String, PageRankData], d: Float) = {

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
index e3d4eb2..7fd962b 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
@@ -22,22 +22,24 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 
+import org.apache.crunch.types.avro.ReaderWriterFactory;
 import org.apache.crunch.types.avro.ReflectDataFactory;
 
 /**
  * An implementation of the {@code ReflectDataFactory} class to work with Scala classes.
  */
-public class ScalaReflectDataFactory extends ReflectDataFactory {
+public class ScalaReflectDataFactory implements ReaderWriterFactory {
 
   @Override
-  public ReflectData getReflectData() { return ScalaSafeReflectData.get(); }
+  public ReflectData getData() { return ScalaSafeReflectData.getInstance(); }
   
   @Override
   public <T> ReflectDatumReader<T> getReader(Schema schema) {
     return new ScalaSafeReflectDatumReader<T>(schema);
   }
-  
-  public <T> ReflectDatumWriter<T> getWriter() {
-    return new ScalaSafeReflectDatumWriter<T>();
+
+  @Override
+  public <T> ReflectDatumWriter<T> getWriter(Schema schema) {
+    return new ScalaSafeReflectDatumWriter<T>(schema);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
index 6118834..6885f3e 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
@@ -48,7 +48,7 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
 
   private static final ScalaSafeReflectData INSTANCE = new ScalaSafeReflectData();
   
-  public static ScalaSafeReflectData get() { return INSTANCE; }
+  public static ScalaSafeReflectData getInstance() { return INSTANCE; }
   
   static final String CLASS_PROP = "java-class";
   static final String ELEMENT_PROP = "java-element-class";
@@ -88,7 +88,7 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
   protected Schema createSchema(Type type, Map<String,Schema> names) {
     if (type instanceof GenericArrayType) {                  // generic array
       Type component = ((GenericArrayType)type).getGenericComponentType();
-      if (component == Byte.TYPE)                            // byte array
+      if (component == Byte.TYPE)                           // byte array
         return Schema.create(Schema.Type.BYTES);           
       Schema result = Schema.createArray(createSchema(component, names));
       setElement(result, component);
@@ -127,8 +127,11 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
         return super.createSchema(type, names);
       if (c.isArray()) {                                     // array
         Class component = c.getComponentType();
-        if (component == Byte.TYPE)                          // byte array
-          return Schema.create(Schema.Type.BYTES);
+        if (component == Byte.TYPE) {                        // byte array
+          Schema result = Schema.create(Schema.Type.BYTES);
+          result.addProp(CLASS_PROP, c.getName()); // For scala-specific byte arrays
+          return result;
+        }
         Schema result = Schema.createArray(createSchema(component, names));
         setElement(result, component);
         return result;
@@ -289,4 +292,14 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
   protected boolean isMap(Object datum) {
     return (datum instanceof java.util.Map) || (datum instanceof scala.collection.Map);
   }
+
+  @Override
+  protected String getSchemaName(Object datum) {
+    if (datum != null) {
+      if(byte[].class.isAssignableFrom(datum.getClass())) {
+        return Schema.Type.BYTES.getName();
+      }
+    }
+    return super.getSchemaName(datum);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
index 123b45e..bbe7305 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import com.google.common.collect.Lists;
 import org.apache.avro.Schema;
 import org.apache.avro.io.ResolvingDecoder;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -34,7 +35,7 @@ import scala.collection.JavaConversions;
 public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
   
   public ScalaSafeReflectDatumReader(Schema schema) {
-    super(schema, schema, ScalaSafeReflectData.get());
+    super(schema, schema, ScalaSafeReflectData.getInstance());
   }
   
   @Override
@@ -100,25 +101,28 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
   @Override
   @SuppressWarnings("unchecked")
   protected Object newArray(Object old, int size, Schema schema) {
-    ScalaSafeReflectData data = ScalaSafeReflectData.get();
-    Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
-        ScalaSafeReflectData.CLASS_PROP);
-    if (collectionClass != null) {
+    Class collectionClass =
+        ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.CLASS_PROP);
+    Class elementClass =
+        ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.ELEMENT_PROP);
+
+    if (collectionClass == null && elementClass == null)
+      return super.newArray(old, size, schema);   // use specific/generic
+
+    ScalaSafeReflectData data = ScalaSafeReflectData.getInstance();
+    if (collectionClass != null && !collectionClass.isArray()) {
       if (old instanceof Collection) {
         ((Collection)old).clear();
         return old;
       }
       if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) ||
-          collectionClass.isAssignableFrom(ArrayList.class)) {
-        return Lists.newArrayList();
-      }
-      return ReflectionUtils.newInstance(collectionClass, null);
+          collectionClass.isAssignableFrom(ArrayList.class))
+        return new ArrayList();
+      return data.newInstance(collectionClass, schema);
     }
-    Class elementClass = ScalaSafeReflectData.getClassProp(schema,
-        ScalaSafeReflectData.ELEMENT_PROP);
-    if (elementClass == null) {
+
+    if (elementClass == null)
       elementClass = data.getClass(schema.getElementType());
-    }
     return Array.newInstance(elementClass, size);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java
index a19d6fb..1ac768c 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java
@@ -20,6 +20,7 @@ package org.apache.crunch.scrunch;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.avro.Schema;
 import org.apache.avro.reflect.ReflectDatumWriter;
 
 import scala.collection.JavaConversions;
@@ -28,10 +29,10 @@ import scala.collection.JavaConversions;
  *
  */
 public class ScalaSafeReflectDatumWriter<T> extends ReflectDatumWriter<T> {
-  public ScalaSafeReflectDatumWriter() {
-    super(ScalaSafeReflectData.get());
+  public ScalaSafeReflectDatumWriter(Schema schema) {
+    super(schema, ScalaSafeReflectData.getInstance());
   }
-  
+
   @Override
   protected long getArraySize(Object array) {
     if (array instanceof scala.collection.Iterable) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index dc0ab0b..31c2f8a 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -69,7 +69,7 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
   }
 
   def materialize() = {
-    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+    setupRun()
     JavaConversions.iterableAsScalaIterable[S](native.materialize)
   }
 
@@ -86,9 +86,15 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
     count.mapValues(_.longValue())
   }
 
-  def max()(implicit converter: Converter[S, S]) = PObject(Aggregate.max(native))(converter)
+  def max()(implicit converter: Converter[S, S]) = {
+    setupRun()
+    PObject(Aggregate.max(native))(converter)
+  }
 
-  def min()(implicit converter: Converter[S, S]) = PObject(Aggregate.min(native))(converter)
+  def min()(implicit converter: Converter[S, S]) = {
+    setupRun()
+    PObject(Aggregate.min(native))(converter)
+  }
 
   def sample(acceptanceProbability: Double) = {
     wrap(Sample.sample(native, acceptanceProbability))

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index 1e2e890..35b90be 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -41,6 +41,10 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
   protected def wrapPairFlatMapFn[K, V](fmt: FunctionType[TraversableOnce[(K, V)]]): DoFn[S, JPair[K, V]]
   protected def wrapPairMapFn[K, V](fmt: FunctionType[(K, V)]): MapFn[S, JPair[K, V]]
 
+  protected def setupRun() {
+    PipelineLike.setupConf(native.getPipeline().getConfiguration())
+  }
+
   /**
    * Returns the underlying PCollection wrapped by this instance.
    */
@@ -254,6 +258,7 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
    * @return The number of elements in this PCollection.
    */
   def length(): PObject[Long] = {
+    setupRun()
     PObject(native.length())
   }
 
@@ -262,6 +267,7 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
    * @return
    */
   def asSeq(): PObject[Seq[S]] = {
+    setupRun()
     PObject(native.asCollection())
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 1d5a70e..aefad67 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -155,16 +155,17 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
   def incrementIfValue(f: V => Boolean) = new IncrementIfPTable[K, V](this, incValueFn(f))
 
   def materialize(): Iterable[(K, V)] = {
-    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+    setupRun()
     native.materialize.view.map(x => (x.first, x.second))
   }
 
   def materializeToMap(): Map[K, V] = {
-    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+    setupRun()
     native.materializeToMap().view.toMap
   }
 
   def asMap(): PObject[Map[K, V]] = {
+    setupRun()
     PObject(native.asMap())
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 394e2ac..aadb026 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -178,7 +178,9 @@ object Avros extends PTypeFamily {
     CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
   }
 
-  def reflects[T: ClassTag]() = {
-    CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]]
+  def reflects[T: ClassTag](): AvroType[T] = {
+    val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
+    val schema = ScalaSafeReflectData.getInstance().getSchema(clazz)
+    CAvros.reflects(clazz, schema)
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
index 67c9b14..a72dc7a 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
@@ -53,6 +53,7 @@ import org.apache.crunch.types.{PTableType, PType}
  * }}}
  */
 class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
+
   /**
    * A convenience method for reading a text file.
    *

http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index 27c43a7..b800612 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -26,6 +26,9 @@ import org.apache.crunch.types.{PTableType, PType}
 trait PipelineLike {
   def jpipeline: JPipeline
 
+  // Call this to ensure we set this up before any subsequent calls to the system
+  PipelineLike.setupConf(getConfiguration())
+
   /**
    * Gets the configuration object associated with this pipeline.
    */
@@ -110,12 +113,13 @@ trait PipelineLike {
    */
   def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt))
 
+
   /**
    * Returns a handler for controlling the execution of the underlying MapReduce
    * pipeline.
    */
   def runAsync(): PipelineExecution = {
-    InterpreterRunner.addReplJarsToJob(getConfiguration())
+    PipelineLike.setupConf(getConfiguration())
     jpipeline.runAsync()
   }
 
@@ -124,7 +128,7 @@ trait PipelineLike {
    * to write data to the output targets.
    */
   def run(): PipelineResult = {
-    InterpreterRunner.addReplJarsToJob(getConfiguration())
+    PipelineLike.setupConf(getConfiguration())
     jpipeline.run()
   }
 
@@ -134,7 +138,7 @@ trait PipelineLike {
    * this run or previous calls to `run`.
    */
   def done(): PipelineResult =  {
-    InterpreterRunner.addReplJarsToJob(getConfiguration())
+    PipelineLike.setupConf(getConfiguration())
     jpipeline.done()
   }
 
@@ -152,3 +156,13 @@ trait PipelineLike {
    */
   def debug(): Unit = jpipeline.enableDebug()
 }
+
+object PipelineLike {
+  def setupConf(conf: Configuration) {
+    InterpreterRunner.addReplJarsToJob(conf)
+    if (conf.get("crunch.reflectdatafactory", "").isEmpty) {
+      // Enables the Scala-specific ReflectDataFactory
+      conf.set("crunch.reflectdatafactory", classOf[ScalaReflectDataFactory].getCanonicalName)
+    }
+  }
+}
\ No newline at end of file