You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/02 04:13:36 UTC
git commit: CRUNCH-401: Enable overrides of AvroMode.REFLECT for use
with Scrunch.
Repository: crunch
Updated Branches:
refs/heads/master fcb861edc -> 583ea6dab
CRUNCH-401: Enable overrides of AvroMode.REFLECT for use with Scrunch.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/583ea6da
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/583ea6da
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/583ea6da
Branch: refs/heads/master
Commit: 583ea6dabc457ec69f316b52548b1dc92f99dda1
Parents: fcb861e
Author: Josh Wills <jw...@apache.org>
Authored: Fri May 23 07:05:38 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Jun 1 18:52:42 2014 -0700
----------------------------------------------------------------------
.../crunch/io/avro/AvroFileReaderFactory.java | 26 ++++++-----
.../lib/join/BloomFilterJoinStrategy.java | 8 ++--
.../org/apache/crunch/lib/join/JoinUtils.java | 5 ++-
.../crunch/lib/sort/ReverseAvroComparator.java | 5 ++-
.../crunch/types/avro/AvroDeepCopier.java | 3 +-
.../crunch/types/avro/AvroGroupedTableType.java | 2 +-
.../org/apache/crunch/types/avro/AvroMode.java | 12 ++---
.../crunch/types/avro/AvroOutputFormat.java | 1 +
.../org/apache/crunch/types/avro/Avros.java | 46 +++++++++++++-------
.../crunch/types/avro/ReflectDataFactory.java | 5 ---
.../apache/crunch/scrunch/DeepCopyTest.scala | 2 +-
.../crunch/scrunch/PageRankClassTest.scala | 8 ++--
.../crunch/scrunch/ScalaReflectDataFactory.java | 12 ++---
.../crunch/scrunch/ScalaSafeReflectData.java | 21 +++++++--
.../scrunch/ScalaSafeReflectDatumReader.java | 30 +++++++------
.../scrunch/ScalaSafeReflectDatumWriter.java | 7 +--
.../org/apache/crunch/scrunch/PCollection.scala | 12 +++--
.../apache/crunch/scrunch/PCollectionLike.scala | 6 +++
.../org/apache/crunch/scrunch/PTable.scala | 5 ++-
.../org/apache/crunch/scrunch/PTypeFamily.scala | 6 ++-
.../org/apache/crunch/scrunch/Pipeline.scala | 1 +
.../apache/crunch/scrunch/PipelineLike.scala | 20 +++++++--
22 files changed, 158 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index 5f53a36..5128fd6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.crunch.MapFn;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.io.FileReaderFactory;
import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.avro.AvroMode;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.fs.FileSystem;
@@ -42,29 +43,32 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
- private final DatumReader<T> recordReader;
+ private DatumReader<T> reader;
+ private final AvroType<?> atype;
private final MapFn<T, T> mapFn;
- public AvroFileReaderFactory(AvroType<T> atype) {
- this(createDatumReader(atype), atype);
+ public AvroFileReaderFactory(Schema schema) {
+ this(null, Avros.generics(schema));
}
- public AvroFileReaderFactory(DatumReader<T> reader, AvroType<T> atype) {
- this.recordReader = reader != null ? reader : createDatumReader(atype);
- this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+ public AvroFileReaderFactory(AvroType<?> atype) {
+ this(null, atype);
}
- public AvroFileReaderFactory(Schema schema) {
- this.recordReader = Avros.newReader(schema);
- this.mapFn = IdentityFn.<T>getInstance();
+ public AvroFileReaderFactory(DatumReader<T> reader, AvroType<?> atype) {
+ this.reader = reader;
+ this.atype = atype;
+ this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
}
- static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
- return Avros.newReader(avroType);
+ static <T> DatumReader<T> createDatumReader(AvroType<T> atype) {
+ return Avros.newReader(atype);
}
@Override
public Iterator<T> read(FileSystem fs, final Path path) {
+ AvroMode mode = AvroMode.fromType(atype).withFactoryFromConfiguration(fs.getConf());
+ final DatumReader recordReader = reader == null ? mode.getReader(atype.getSchema()) : reader;
this.mapFn.initialize();
try {
FsInput fsi = new FsInput(path, fs.getConf());
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
index 872f3e3..69fe27e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.commons.logging.Log;
@@ -35,9 +36,9 @@ import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.ReadableData;
-import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroMode;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.avro.Avros;
@@ -304,11 +305,12 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
private AvroType<T> ptype;
private BinaryEncoder encoder;
- private ReflectDatumWriter datumWriter;
+ private DatumWriter datumWriter;
AvroToBytesFn(AvroType<T> ptype, Configuration conf) {
this.ptype = ptype;
- datumWriter = Avros.getReflectDataFactory(conf).getWriter(ptype.getSchema());
+ datumWriter = AvroMode.fromType(ptype).withFactoryFromConfiguration(conf)
+ .getWriter(ptype.getSchema());
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
index b4829be..02963a7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -26,6 +26,7 @@ import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.reflect.ReflectData;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroMode;
import org.apache.crunch.types.writable.TupleWritable;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.hadoop.conf.Configuration;
@@ -108,6 +109,7 @@ public class JoinUtils {
public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
private Schema schema;
+ private AvroMode mode;
@Override
public void setConf(Configuration conf) {
@@ -116,12 +118,13 @@ public class JoinUtils {
Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
schema = keySchema.getFields().get(0).schema();
+ mode = AvroMode.fromShuffleConfiguration(conf);
}
}
@Override
public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
- return ReflectData.get().compare(x.datum(), y.datum(), schema);
+ return mode.getData().compare(x.datum(), y.datum(), schema);
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java b/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java
index c404492..b94ccc2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/sort/ReverseAvroComparator.java
@@ -21,6 +21,7 @@ import org.apache.avro.Schema;
import org.apache.avro.io.BinaryData;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.types.avro.AvroMode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.RawComparator;
@@ -28,18 +29,20 @@ import org.apache.hadoop.io.RawComparator;
public class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
private Schema schema;
+ private AvroMode mode;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf != null) {
schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+ mode = AvroMode.fromShuffleConfiguration(conf);
}
}
@Override
public int compare(AvroKey<T> o1, AvroKey<T> o2) {
- return ReflectData.get().compare(o2.datum(), o1.datum(), schema);
+ return mode.getData().compare(o2.datum(), o1.datum(), schema);
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 56ec459..4a98228 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -153,8 +153,7 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
@Override
protected DatumReader<T> createDatumReader(Configuration conf) {
- AvroMode.REFLECT.configureFactory(conf);
- return AvroMode.REFLECT.getReader(getSchema());
+ return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(getSchema());
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index a97f917..7178274 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
options.configure(job);
}
- AvroMode.fromType(att).configureShuffle(conf);
+ AvroMode.fromType(att).withFactoryFromConfiguration(conf).configureShuffle(conf);
Collection<String> serializations = job.getConfiguration().getStringCollection(
"io.serializations");
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
index 90ff791..3d7fbfa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
@@ -51,12 +51,12 @@ public class AvroMode implements ReaderWriterFactory {
/**
* Default mode to use for reading and writing {@link ReflectData Reflect} types.
*/
- public static final AvroMode REFLECT = new AvroMode(ModeType.REFLECT, new ReflectDataFactory(), Avros.REFLECT_DATA_FACTORY_CLASS);
+ public static final AvroMode REFLECT = new AvroMode(ModeType.REFLECT, Avros.REFLECT_DATA_FACTORY_CLASS);
/**
* Default mode to use for reading and writing {@link SpecificData Specific} types.
*/
- public static final AvroMode SPECIFIC =new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory");
+ public static final AvroMode SPECIFIC = new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory");
/**
* Default mode to use for reading and writing {@link GenericData Generic} types.
*/
@@ -95,11 +95,11 @@ public class AvroMode implements ReaderWriterFactory {
if (type.hasSpecific()) {
Avros.checkCombiningSpecificAndReflectionSchemas();
}
- return AvroMode.REFLECT;
+ return REFLECT;
} else if (type.hasSpecific()) {
- return AvroMode.SPECIFIC;
+ return SPECIFIC;
} else {
- return AvroMode.GENERIC;
+ return GENERIC;
}
}
@@ -352,7 +352,7 @@ public class AvroMode implements ReaderWriterFactory {
}
@SuppressWarnings("unchecked")
- AvroMode withFactoryFromConfiguration(Configuration conf) {
+ public AvroMode withFactoryFromConfiguration(Configuration conf) {
// although the shuffle and input/output use different properties for mode,
// this is shared - only one ReaderWriterFactory can be used.
Class<?> factoryClass = conf.getClass(propName, this.getClass());
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 5a4499d..6dbb6de 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 1fcb30e..62945b1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -100,14 +100,11 @@ public class Avros {
*
* @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory)
*/
- public static ReflectDataFactory REFLECT_DATA_FACTORY =
- (ReflectDataFactory) AvroMode.REFLECT.getFactory();
+ public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
/**
* The name of the configuration parameter that tracks which reflection
* factory to use.
- *
- * @deprecated as of 0.9.0; use AvroMode.REFLECT.override(ReaderWriterFactory)
*/
public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
@@ -273,7 +270,11 @@ public class Avros {
}
public static final <T> AvroType<T> reflects(Class<T> clazz) {
- Schema schema = REFLECT_DATA_FACTORY.getData().getSchema(clazz);
+ Schema schema = ((ReflectData) AvroMode.REFLECT.getData()).getSchema(clazz);
+ return reflects(clazz, schema);
+ }
+
+ public static final <T> AvroType<T> reflects(Class<T> clazz, Schema schema) {
return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
}
@@ -541,6 +542,7 @@ public class Avros {
private final String jsonSchema;
private final boolean isReflect;
private transient Schema schema;
+ private transient AvroMode mode;
public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
this.fns = Lists.newArrayList();
@@ -585,11 +587,16 @@ public class Avros {
for (MapFn fn : fns) {
fn.initialize();
}
+ if (getConfiguration() != null) {
+ mode = AvroMode.REFLECT.withFactoryFromConfiguration(getConfiguration());
+ } else {
+ mode = AvroMode.REFLECT;
+ }
}
private GenericRecord createRecord() {
if (isReflect) {
- return new ReflectGenericRecord(schema);
+ return new ReflectGenericRecord(schema, mode);
} else {
return new GenericData.Record(schema);
}
@@ -693,6 +700,7 @@ public class Avros {
private final String jsonSchema;
private final boolean isReflect;
private transient Schema schema;
+ private transient AvroMode mode;
public TupleToUnionRecord(Schema schema, PType<?>... ptypes) {
this.fns = Lists.newArrayList();
@@ -737,11 +745,16 @@ public class Avros {
for (MapFn fn : fns) {
fn.initialize();
}
+ if (getConfiguration() != null) {
+ mode = AvroMode.REFLECT.withFactoryFromConfiguration(getConfiguration());
+ } else {
+ mode = AvroMode.REFLECT;
+ }
}
private GenericRecord createRecord() {
if (isReflect) {
- return new ReflectGenericRecord(schema);
+ return new ReflectGenericRecord(schema, mode);
} else {
return new GenericData.Record(schema);
}
@@ -850,20 +863,23 @@ public class Avros {
private static class ReflectGenericRecord extends GenericData.Record {
- public ReflectGenericRecord(Schema schema) {
+ private AvroMode mode;
+
+ public ReflectGenericRecord(Schema schema, AvroMode mode) {
super(schema);
+ this.mode = mode;
}
@Override
public int hashCode() {
- return reflectAwareHashCode(this, getSchema());
+ return reflectAwareHashCode(this, getSchema(), mode);
}
}
/*
* TODO: Remove this once we no longer have to support 1.5.4.
*/
- private static int reflectAwareHashCode(Object o, Schema s) {
+ private static int reflectAwareHashCode(Object o, Schema s, AvroMode mode) {
if (o == null)
return 0; // incomplete datum
int hashCode = 1;
@@ -872,17 +888,17 @@ public class Avros {
for (Schema.Field f : s.getFields()) {
if (f.order() == Schema.Field.Order.IGNORE)
continue;
- hashCode = hashCodeAdd(hashCode, ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
+ hashCode = hashCodeAdd(hashCode, mode.getData().getField(o, f.name(), f.pos()), f.schema(), mode);
}
return hashCode;
case ARRAY:
Collection<?> a = (Collection<?>) o;
Schema elementType = s.getElementType();
for (Object e : a)
- hashCode = hashCodeAdd(hashCode, e, elementType);
+ hashCode = hashCodeAdd(hashCode, e, elementType, mode);
return hashCode;
case UNION:
- return reflectAwareHashCode(o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
+ return reflectAwareHashCode(o, s.getTypes().get(mode.getData().resolveUnion(s, o)), mode);
case ENUM:
return s.getEnumOrdinal(o.toString());
case NULL:
@@ -895,8 +911,8 @@ public class Avros {
}
/** Add the hash code for an object into an accumulated hash code. */
- private static int hashCodeAdd(int hashCode, Object o, Schema s) {
- return 31 * hashCode + reflectAwareHashCode(o, s);
+ private static int hashCodeAdd(int hashCode, Object o, Schema s, AvroMode mode) {
+ return 31 * hashCode + reflectAwareHashCode(o, s, mode);
}
private Avros() {
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
index 3a5d6f4..65c1f49 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
@@ -32,11 +32,6 @@ public class ReflectDataFactory implements ReaderWriterFactory {
return ReflectData.AllowNull.get();
}
- // for backwards-compatibility
- public ReflectData getReflectData() {
- return getData();
- }
-
@Override
public <T> ReflectDatumReader<T> getReader(Schema schema) {
return new ReflectDatumReader<T>(schema);
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
index c931754..816993b 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -73,7 +73,7 @@ class DeepCopyTest extends CrunchSuite {
@SuppressWarnings(Array("rawtypes", "unchecked"))
private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
val r: AnyRef = records.iterator.next()
- val schema = new ScalaReflectDataFactory().getReflectData.getSchema(r.getClass)
+ val schema = new ScalaReflectDataFactory().getData.getSchema(r.getClass)
val writer = new ScalaReflectDataFactory().getWriter[T](schema)
val dataFileWriter = new DataFileWriter(writer)
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
index 3d3cb9f..f7ccf1a 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
@@ -27,12 +27,12 @@ import scala.collection.mutable.HashMap
import _root_.org.junit.Assert._
import _root_.org.junit.Test
-case class PageRankData(page_rank: Float, oldpr: Float, urls: Array[String]) {
- def this() = this(0f, 0f, null)
+case class PageRankData(page_rank: Float, oldpr: Float, urls: Array[String], bytes: Array[Byte]) {
+ def this() = this(0f, 0f, null, Array[Byte](0))
def scaledPageRank = page_rank / urls.length
- def next(newPageRank: Float) = new PageRankData(newPageRank, page_rank, urls)
+ def next(newPageRank: Float) = new PageRankData(newPageRank, page_rank, urls, bytes)
def delta = math.abs(page_rank - oldpr)
}
@@ -67,7 +67,7 @@ class PageRankClassTest extends CrunchSuite {
pipeline.read(from.textFile(fileName, Avros.strings))
.map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
.groupByKey
- .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray)))
+ .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray, Array[Byte](0))))
}
def update(prev: PTable[String, PageRankData], d: Float) = {
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
index e3d4eb2..7fd962b 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaReflectDataFactory.java
@@ -22,22 +22,24 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.crunch.types.avro.ReaderWriterFactory;
import org.apache.crunch.types.avro.ReflectDataFactory;
/**
* An implementation of the {@code ReflectDataFactory} class to work with Scala classes.
*/
-public class ScalaReflectDataFactory extends ReflectDataFactory {
+public class ScalaReflectDataFactory implements ReaderWriterFactory {
@Override
- public ReflectData getReflectData() { return ScalaSafeReflectData.get(); }
+ public ReflectData getData() { return ScalaSafeReflectData.getInstance(); }
@Override
public <T> ReflectDatumReader<T> getReader(Schema schema) {
return new ScalaSafeReflectDatumReader<T>(schema);
}
-
- public <T> ReflectDatumWriter<T> getWriter() {
- return new ScalaSafeReflectDatumWriter<T>();
+
+ @Override
+ public <T> ReflectDatumWriter<T> getWriter(Schema schema) {
+ return new ScalaSafeReflectDatumWriter<T>(schema);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
index 6118834..6885f3e 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectData.java
@@ -48,7 +48,7 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
private static final ScalaSafeReflectData INSTANCE = new ScalaSafeReflectData();
- public static ScalaSafeReflectData get() { return INSTANCE; }
+ public static ScalaSafeReflectData getInstance() { return INSTANCE; }
static final String CLASS_PROP = "java-class";
static final String ELEMENT_PROP = "java-element-class";
@@ -88,7 +88,7 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
protected Schema createSchema(Type type, Map<String,Schema> names) {
if (type instanceof GenericArrayType) { // generic array
Type component = ((GenericArrayType)type).getGenericComponentType();
- if (component == Byte.TYPE) // byte array
+ if (component == Byte.TYPE) // byte array
return Schema.create(Schema.Type.BYTES);
Schema result = Schema.createArray(createSchema(component, names));
setElement(result, component);
@@ -127,8 +127,11 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
return super.createSchema(type, names);
if (c.isArray()) { // array
Class component = c.getComponentType();
- if (component == Byte.TYPE) // byte array
- return Schema.create(Schema.Type.BYTES);
+ if (component == Byte.TYPE) { // byte array
+ Schema result = Schema.create(Schema.Type.BYTES);
+ result.addProp(CLASS_PROP, c.getName()); // For scala-specific byte arrays
+ return result;
+ }
Schema result = Schema.createArray(createSchema(component, names));
setElement(result, component);
return result;
@@ -289,4 +292,14 @@ public class ScalaSafeReflectData extends ReflectData.AllowNull {
protected boolean isMap(Object datum) {
return (datum instanceof java.util.Map) || (datum instanceof scala.collection.Map);
}
+
+ @Override
+ protected String getSchemaName(Object datum) {
+ if (datum != null) {
+ if(byte[].class.isAssignableFrom(datum.getClass())) {
+ return Schema.Type.BYTES.getName();
+ }
+ }
+ return super.getSchemaName(datum);
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
index 123b45e..bbe7305 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.io.ResolvingDecoder;
+import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.hadoop.util.ReflectionUtils;
@@ -34,7 +35,7 @@ import scala.collection.JavaConversions;
public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
public ScalaSafeReflectDatumReader(Schema schema) {
- super(schema, schema, ScalaSafeReflectData.get());
+ super(schema, schema, ScalaSafeReflectData.getInstance());
}
@Override
@@ -100,25 +101,28 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
@Override
@SuppressWarnings("unchecked")
protected Object newArray(Object old, int size, Schema schema) {
- ScalaSafeReflectData data = ScalaSafeReflectData.get();
- Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
- ScalaSafeReflectData.CLASS_PROP);
- if (collectionClass != null) {
+ Class collectionClass =
+ ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.CLASS_PROP);
+ Class elementClass =
+ ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.ELEMENT_PROP);
+
+ if (collectionClass == null && elementClass == null)
+ return super.newArray(old, size, schema); // use specific/generic
+
+ ScalaSafeReflectData data = ScalaSafeReflectData.getInstance();
+ if (collectionClass != null && !collectionClass.isArray()) {
if (old instanceof Collection) {
((Collection)old).clear();
return old;
}
if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) ||
- collectionClass.isAssignableFrom(ArrayList.class)) {
- return Lists.newArrayList();
- }
- return ReflectionUtils.newInstance(collectionClass, null);
+ collectionClass.isAssignableFrom(ArrayList.class))
+ return new ArrayList();
+ return data.newInstance(collectionClass, schema);
}
- Class elementClass = ScalaSafeReflectData.getClassProp(schema,
- ScalaSafeReflectData.ELEMENT_PROP);
- if (elementClass == null) {
+
+ if (elementClass == null)
elementClass = data.getClass(schema.getElementType());
- }
return Array.newInstance(elementClass, size);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java
index a19d6fb..1ac768c 100644
--- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java
+++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumWriter.java
@@ -20,6 +20,7 @@ package org.apache.crunch.scrunch;
import java.util.Iterator;
import java.util.Map;
+import org.apache.avro.Schema;
import org.apache.avro.reflect.ReflectDatumWriter;
import scala.collection.JavaConversions;
@@ -28,10 +29,10 @@ import scala.collection.JavaConversions;
*
*/
public class ScalaSafeReflectDatumWriter<T> extends ReflectDatumWriter<T> {
- public ScalaSafeReflectDatumWriter() {
- super(ScalaSafeReflectData.get());
+ public ScalaSafeReflectDatumWriter(Schema schema) {
+ super(schema, ScalaSafeReflectData.getInstance());
}
-
+
@Override
protected long getArraySize(Object array) {
if (array instanceof scala.collection.Iterable) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index dc0ab0b..31c2f8a 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -69,7 +69,7 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
}
def materialize() = {
- InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+ setupRun()
JavaConversions.iterableAsScalaIterable[S](native.materialize)
}
@@ -86,9 +86,15 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
count.mapValues(_.longValue())
}
- def max()(implicit converter: Converter[S, S]) = PObject(Aggregate.max(native))(converter)
+ def max()(implicit converter: Converter[S, S]) = {
+ setupRun()
+ PObject(Aggregate.max(native))(converter)
+ }
- def min()(implicit converter: Converter[S, S]) = PObject(Aggregate.min(native))(converter)
+ def min()(implicit converter: Converter[S, S]) = {
+ setupRun()
+ PObject(Aggregate.min(native))(converter)
+ }
def sample(acceptanceProbability: Double) = {
wrap(Sample.sample(native, acceptanceProbability))
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index 1e2e890..35b90be 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -41,6 +41,10 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
protected def wrapPairFlatMapFn[K, V](fmt: FunctionType[TraversableOnce[(K, V)]]): DoFn[S, JPair[K, V]]
protected def wrapPairMapFn[K, V](fmt: FunctionType[(K, V)]): MapFn[S, JPair[K, V]]
+ protected def setupRun() {
+ PipelineLike.setupConf(native.getPipeline().getConfiguration())
+ }
+
/**
* Returns the underlying PCollection wrapped by this instance.
*/
@@ -254,6 +258,7 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
* @return The number of elements in this PCollection.
*/
def length(): PObject[Long] = {
+ setupRun()
PObject(native.length())
}
@@ -262,6 +267,7 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
* @return
*/
def asSeq(): PObject[Seq[S]] = {
+ setupRun()
PObject(native.asCollection())
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 1d5a70e..aefad67 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -155,16 +155,17 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
def incrementIfValue(f: V => Boolean) = new IncrementIfPTable[K, V](this, incValueFn(f))
def materialize(): Iterable[(K, V)] = {
- InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+ setupRun()
native.materialize.view.map(x => (x.first, x.second))
}
def materializeToMap(): Map[K, V] = {
- InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+ setupRun()
native.materializeToMap().view.toMap
}
def asMap(): PObject[Map[K, V]] = {
+ setupRun()
PObject(native.asMap())
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 394e2ac..aadb026 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -178,7 +178,9 @@ object Avros extends PTypeFamily {
CAvros.specifics(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
}
- def reflects[T: ClassTag]() = {
- CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]]
+ def reflects[T: ClassTag](): AvroType[T] = {
+ val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
+ val schema = ScalaSafeReflectData.getInstance().getSchema(clazz)
+ CAvros.reflects(clazz, schema)
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
index 67c9b14..a72dc7a 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
@@ -53,6 +53,7 @@ import org.apache.crunch.types.{PTableType, PType}
* }}}
*/
class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
+
/**
* A convenience method for reading a text file.
*
http://git-wip-us.apache.org/repos/asf/crunch/blob/583ea6da/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index 27c43a7..b800612 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -26,6 +26,9 @@ import org.apache.crunch.types.{PTableType, PType}
trait PipelineLike {
def jpipeline: JPipeline
+ // Call this to ensure we set this up before any subsequent calls to the system
+ PipelineLike.setupConf(getConfiguration())
+
/**
* Gets the configuration object associated with this pipeline.
*/
@@ -110,12 +113,13 @@ trait PipelineLike {
*/
def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt))
+
/**
* Returns a handler for controlling the execution of the underlying MapReduce
* pipeline.
*/
def runAsync(): PipelineExecution = {
- InterpreterRunner.addReplJarsToJob(getConfiguration())
+ PipelineLike.setupConf(getConfiguration())
jpipeline.runAsync()
}
@@ -124,7 +128,7 @@ trait PipelineLike {
* to write data to the output targets.
*/
def run(): PipelineResult = {
- InterpreterRunner.addReplJarsToJob(getConfiguration())
+ PipelineLike.setupConf(getConfiguration())
jpipeline.run()
}
@@ -134,7 +138,7 @@ trait PipelineLike {
* this run or previous calls to `run`.
*/
def done(): PipelineResult = {
- InterpreterRunner.addReplJarsToJob(getConfiguration())
+ PipelineLike.setupConf(getConfiguration())
jpipeline.done()
}
@@ -152,3 +156,13 @@ trait PipelineLike {
*/
def debug(): Unit = jpipeline.enableDebug()
}
+
+object PipelineLike {
+ def setupConf(conf: Configuration) {
+ InterpreterRunner.addReplJarsToJob(conf)
+ if (conf.get("crunch.reflectdatafactory", "").isEmpty) {
+ // Enables the Scala-specific ReflectDataFactory
+ conf.set("crunch.reflectdatafactory", classOf[ScalaReflectDataFactory].getCanonicalName)
+ }
+ }
+}
\ No newline at end of file