You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/06/29 23:30:30 UTC
git commit: Improve compatibility with Avro ReflectDatumReader
Updated Branches:
refs/heads/master 76c501568 -> fe53b71fe
Improve compatibility with Avro ReflectDatumReader
Allow Avro-based readers to correctly select between the
ReflectDatumReader, GenericDatumReader, and SpecificDatumReader,
allowing POJOs to be used fully throughout pipelines.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/fe53b71f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/fe53b71f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/fe53b71f
Branch: refs/heads/master
Commit: fe53b71fe3ecf049eaa5fc3cfc99c8fa0cdd5169
Parents: 76c5015
Author: Gabriel Reid <ga...@gmail.com>
Authored: Fri Jun 29 23:26:20 2012 +0200
Committer: Gabriel Reid <ga...@gmail.com>
Committed: Fri Jun 29 23:26:20 2012 +0200
----------------------------------------------------------------------
.../crunch/io/avro/AvroFileReaderFactory.java | 83 +-
.../cloudera/crunch/types/avro/AvroTableType.java | 246 ++--
.../com/cloudera/crunch/types/avro/AvroType.java | 34 +-
.../java/com/cloudera/crunch/types/avro/Avros.java | 1082 ++++++++-------
.../crunch/io/avro/AvroFileReaderFactoryTest.java | 67 +-
.../crunch/io/avro/AvroFileSourceTargetTest.java | 60 +-
.../cloudera/crunch/io/avro/AvroReflectTest.java | 98 ++
.../cloudera/crunch/types/avro/AvroTypeTest.java | 37 +-
8 files changed, 976 insertions(+), 731 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java b/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
index 82eb379..a3d673c 100644
--- a/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
+++ b/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
@@ -21,6 +21,7 @@ import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,47 +37,51 @@ import com.google.common.collect.UnmodifiableIterator;
public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
- private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
+ private static final Log LOG = LogFactory
+ .getLog(AvroFileReaderFactory.class);
- private final DatumReader<T> recordReader;
- private final MapFn<T, T> mapFn;
- private final Configuration conf;
-
- public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
- this.recordReader = createDatumReader(atype);
- this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
- this.conf = conf;
- }
+ private final DatumReader<T> recordReader;
+ private final MapFn<T, T> mapFn;
+ private final Configuration conf;
- private DatumReader<T> createDatumReader(AvroType<T> avroType) {
- if (avroType.isSpecific()) {
- return new SpecificDatumReader<T>(avroType.getSchema());
- } else {
- return new GenericDatumReader<T>(avroType.getSchema());
- }
- }
+ public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
+ this.recordReader = createDatumReader(atype);
+ this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+ this.conf = conf;
+ }
- @Override
- public Iterator<T> read(FileSystem fs, final Path path) {
- this.mapFn.setConfigurationForTest(conf);
- this.mapFn.initialize();
- try {
- FsInput fsi = new FsInput(path, fs.getConf());
- final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
- return new UnmodifiableIterator<T>() {
- @Override
- public boolean hasNext() {
- return reader.hasNext();
- }
+ private DatumReader<T> createDatumReader(AvroType<T> avroType) {
+ if (avroType.isSpecific()) {
+ return new SpecificDatumReader<T>(avroType.getSchema());
+ } else if (avroType.isGeneric()) {
+ return new GenericDatumReader<T>(avroType.getSchema());
+ } else {
+ return new ReflectDatumReader<T>(avroType.getSchema());
+ }
+ }
- @Override
- public T next() {
- return mapFn.map(reader.next());
- }
- };
- } catch (IOException e) {
- LOG.info("Could not read avro file at path: " + path, e);
- return Iterators.emptyIterator();
- }
- }
+ @Override
+ public Iterator<T> read(FileSystem fs, final Path path) {
+ this.mapFn.setConfigurationForTest(conf);
+ this.mapFn.initialize();
+ try {
+ FsInput fsi = new FsInput(path, fs.getConf());
+ final DataFileReader<T> reader = new DataFileReader<T>(fsi,
+ recordReader);
+ return new UnmodifiableIterator<T>() {
+ @Override
+ public boolean hasNext() {
+ return reader.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return mapFn.map(reader.next());
+ }
+ };
+ } catch (IOException e) {
+ LOG.info("Could not read avro file at path: " + path, e);
+ return Iterators.emptyIterator();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
index 9a93ce3..8d71b7f 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
@@ -26,123 +26,133 @@ import com.cloudera.crunch.types.PType;
/**
* The implementation of the PTableType interface for Avro-based serialization.
- *
+ *
*/
-public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, V> {
-
- private static class PairToAvroPair extends MapFn<Pair, org.apache.avro.mapred.Pair> {
- private final MapFn keyMapFn;
- private final MapFn valueMapFn;
- private final String firstJson;
- private final String secondJson;
-
- private String pairSchemaJson;
- private transient Schema pairSchema;
-
- public PairToAvroPair(AvroType keyType, AvroType valueType) {
- this.keyMapFn = keyType.getOutputMapFn();
- this.firstJson = keyType.getSchema().toString();
- this.valueMapFn = valueType.getOutputMapFn();
- this.secondJson = valueType.getSchema().toString();
- }
-
- @Override
- public void configure(Configuration conf) {
- keyMapFn.configure(conf);
- valueMapFn.configure(conf);
- }
-
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- keyMapFn.setConfigurationForTest(conf);
- valueMapFn.setConfigurationForTest(conf);
- }
-
- @Override
- public void initialize() {
- keyMapFn.setContext(getContext());
- valueMapFn.setContext(getContext());
- pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
- new Schema.Parser().parse(firstJson),
- new Schema.Parser().parse(secondJson)).toString();
- }
-
- @Override
- public org.apache.avro.mapred.Pair map(Pair input) {
- if(pairSchema == null) {
- pairSchema = new Schema.Parser().parse(pairSchemaJson);
- }
- org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(pairSchema);
- avroPair.key(keyMapFn.map(input.first()));
- avroPair.value(valueMapFn.map(input.second()));
- return avroPair;
- }
- }
-
- private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
-
- private final MapFn firstMapFn;
- private final MapFn secondMapFn;
-
- public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
- this.firstMapFn = firstMapFn;
- this.secondMapFn = secondMapFn;
- }
-
- @Override
- public void configure(Configuration conf) {
- firstMapFn.configure(conf);
- secondMapFn.configure(conf);
- }
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- firstMapFn.setConfigurationForTest(conf);
- secondMapFn.setConfigurationForTest(conf);
- }
-
- @Override
- public void initialize() {
- firstMapFn.setContext(getContext());
- secondMapFn.setContext(getContext());
- }
-
- @Override
- public Pair map(IndexedRecord input) {
- return Pair.of(firstMapFn.map(input.get(0)), secondMapFn.map(input.get(1)));
- }
- }
-
- private final AvroType<K> keyType;
- private final AvroType<V> valueType;
-
- public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
- super(pairClass,
- org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()),
- new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()),
- new PairToAvroPair(keyType, valueType), keyType, valueType);
- this.keyType = keyType;
- this.valueType = valueType;
- }
-
- @Override
- public boolean isSpecific() {
- return keyType.isSpecific() || valueType.isSpecific();
- }
-
- @Override
- public PType<K> getKeyType() {
- return keyType;
- }
-
- @Override
- public PType<V> getValueType() {
- return valueType;
- }
-
- @Override
- public PGroupedTableType<K, V> getGroupedTableType() {
- return new AvroGroupedTableType<K, V>(this);
- }
+public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements
+ PTableType<K, V> {
+
+ private static class PairToAvroPair extends
+ MapFn<Pair, org.apache.avro.mapred.Pair> {
+ private final MapFn keyMapFn;
+ private final MapFn valueMapFn;
+ private final String firstJson;
+ private final String secondJson;
+
+ private String pairSchemaJson;
+ private transient Schema pairSchema;
+
+ public PairToAvroPair(AvroType keyType, AvroType valueType) {
+ this.keyMapFn = keyType.getOutputMapFn();
+ this.firstJson = keyType.getSchema().toString();
+ this.valueMapFn = valueType.getOutputMapFn();
+ this.secondJson = valueType.getSchema().toString();
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ keyMapFn.configure(conf);
+ valueMapFn.configure(conf);
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ keyMapFn.setConfigurationForTest(conf);
+ valueMapFn.setConfigurationForTest(conf);
+ }
+
+ @Override
+ public void initialize() {
+ keyMapFn.setContext(getContext());
+ valueMapFn.setContext(getContext());
+ pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
+ new Schema.Parser().parse(firstJson),
+ new Schema.Parser().parse(secondJson)).toString();
+ }
+
+ @Override
+ public org.apache.avro.mapred.Pair map(Pair input) {
+ if (pairSchema == null) {
+ pairSchema = new Schema.Parser().parse(pairSchemaJson);
+ }
+ org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(
+ pairSchema);
+ avroPair.key(keyMapFn.map(input.first()));
+ avroPair.value(valueMapFn.map(input.second()));
+ return avroPair;
+ }
+ }
+
+ private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
+
+ private final MapFn firstMapFn;
+ private final MapFn secondMapFn;
+
+ public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
+ this.firstMapFn = firstMapFn;
+ this.secondMapFn = secondMapFn;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ firstMapFn.configure(conf);
+ secondMapFn.configure(conf);
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ firstMapFn.setConfigurationForTest(conf);
+ secondMapFn.setConfigurationForTest(conf);
+ }
+
+ @Override
+ public void initialize() {
+ firstMapFn.setContext(getContext());
+ secondMapFn.setContext(getContext());
+ }
+
+ @Override
+ public Pair map(IndexedRecord input) {
+ return Pair.of(firstMapFn.map(input.get(0)),
+ secondMapFn.map(input.get(1)));
+ }
+ }
+
+ private final AvroType<K> keyType;
+ private final AvroType<V> valueType;
+
+ public AvroTableType(AvroType<K> keyType, AvroType<V> valueType,
+ Class<Pair<K, V>> pairClass) {
+ super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(
+ keyType.getSchema(), valueType.getSchema()),
+ new IndexedRecordToPair(keyType.getInputMapFn(),
+ valueType.getInputMapFn()), new PairToAvroPair(keyType,
+ valueType), keyType, valueType);
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ @Override
+ public boolean isSpecific() {
+ return keyType.isSpecific() || valueType.isSpecific();
+ }
+
+ @Override
+ public boolean isGeneric() {
+ return keyType.isGeneric() || valueType.isGeneric();
+ }
+
+ @Override
+ public PType<K> getKeyType() {
+ return keyType;
+ }
+
+ @Override
+ public PType<V> getValueType() {
+ return valueType;
+ }
+
+ @Override
+ public PGroupedTableType<K, V> getGroupedTableType() {
+ return new AvroGroupedTableType<K, V>(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
index e9696d8..3db00c0 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
@@ -17,6 +17,7 @@ package com.cloudera.crunch.types.avro;
import java.util.List;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.Path;
@@ -46,8 +47,8 @@ public class AvroType<T> implements PType<T> {
private final List<PType> subTypes;
public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
- this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(),
- ptypes);
+ this(typeClass, schema, IdentityFn.getInstance(), IdentityFn
+ .getInstance(), ptypes);
}
public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn,
@@ -79,20 +80,29 @@ public class AvroType<T> implements PType<T> {
}
/**
- * Determine if the wrapped type is a specific or generic avro type.
+ * Determine if the wrapped type is a specific data avro type.
*
* @return true if the wrapped type is a specific data type
*/
public boolean isSpecific() {
- if (SpecificRecord.class.isAssignableFrom(typeClass)) {
- return true;
- }
- for (PType ptype : subTypes) {
- if (SpecificRecord.class.isAssignableFrom(ptype.getTypeClass())) {
- return true;
- }
- }
- return false;
+ if (SpecificRecord.class.isAssignableFrom(typeClass)) {
+ return true;
+ }
+ for (PType ptype : subTypes) {
+ if (SpecificRecord.class.isAssignableFrom(ptype.getTypeClass())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Determine if the wrapped type is a generic data avro type.
+ *
+ * @return true if the wrapped type is a generic type
+ */
+ public boolean isGeneric() {
+ return GenericData.Record.class.equals(typeClass);
}
public MapFn<Object, T> getInputMapFn() {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/main/java/com/cloudera/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/Avros.java b/src/main/java/com/cloudera/crunch/types/avro/Avros.java
index d084e3b..d2edae2 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/Avros.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/Avros.java
@@ -55,530 +55,566 @@ import com.google.common.collect.Maps;
/**
* Defines static methods that are analogous to the methods defined in
* {@link AvroTypeFamily} for convenient static importing.
- *
+ *
*/
public class Avros {
- /**
- * The instance we use for generating reflected schemas. May be modified by clients (e.g., Scrunch.)
- */
- public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
-
- /**
- * The name of the configuration parameter that tracks which reflection factory to use.
- */
- public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
-
- public static void configureReflectDataFactory(Configuration conf) {
- conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(),
- ReflectDataFactory.class);
- }
-
- public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
- return (ReflectDataFactory) ReflectionUtils.newInstance(
- conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
- }
-
- public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
- @Override
- public String map(CharSequence input) {
- return input.toString();
- }
- };
-
- public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
- @Override
- public Utf8 map(String input) {
- return new Utf8(input);
- }
- };
-
- public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
- @Override
- public ByteBuffer map(Object input) {
- if (input instanceof ByteBuffer) {
- return (ByteBuffer) input;
- }
- return ByteBuffer.wrap((byte[]) input);
- }
- };
-
- private static final AvroType<String> strings = new AvroType<String>(
- String.class, Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8);
- private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
- private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
- private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
- private static final AvroType<Float> floats = create(Float.class, Schema.Type.FLOAT);
- private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
- private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
- private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(
- ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance());
-
- private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>>builder()
- .put(String.class, strings)
- .put(Long.class, longs)
- .put(Integer.class, ints)
- .put(Float.class, floats)
- .put(Double.class, doubles)
- .put(Boolean.class, booleans)
- .put(ByteBuffer.class, bytes)
- .build();
-
- private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
-
- public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
- EXTENSIONS.put(clazz, ptype);
- }
-
- public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
- return (PType<T>) PRIMITIVES.get(clazz);
- }
-
- private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
- return new AvroType<T>(clazz, Schema.create(schemaType));
- }
-
- public static final AvroType<Void> nulls() {
- return nulls;
- }
-
- public static final AvroType<String> strings() {
- return strings;
- }
-
- public static final AvroType<Long> longs() {
- return longs;
- }
-
- public static final AvroType<Integer> ints() {
- return ints;
- }
-
- public static final AvroType<Float> floats() {
- return floats;
- }
-
- public static final AvroType<Double> doubles() {
- return doubles;
- }
-
- public static final AvroType<Boolean> booleans() {
- return booleans;
- }
-
- public static final AvroType<ByteBuffer> bytes() {
- return bytes;
- }
-
- public static final <T> AvroType<T> records(Class<T> clazz) {
- if (EXTENSIONS.containsKey(clazz)) {
- return (AvroType<T>) EXTENSIONS.get(clazz);
- }
- return containers(clazz);
- }
-
- public static final AvroType<GenericData.Record> generics(Schema schema) {
- return new AvroType<GenericData.Record>(GenericData.Record.class, schema);
- }
-
- public static final <T> AvroType<T> containers(Class<T> clazz) {
- return reflects(clazz);
- }
-
- public static final <T> AvroType<T> reflects(Class<T> clazz) {
- return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz));
- }
-
- private static class BytesToWritableMapFn<T extends Writable> extends MapFn<ByteBuffer, T> {
- private static final Log LOG = LogFactory.getLog(BytesToWritableMapFn.class);
-
- private final Class<T> writableClazz;
-
- public BytesToWritableMapFn(Class<T> writableClazz) {
- this.writableClazz = writableClazz;
- }
-
- @Override
- public T map(ByteBuffer input) {
- T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
- try {
- instance.readFields(new DataInputStream(new ByteArrayInputStream(
- input.array(), input.arrayOffset(), input.limit())));
- } catch (IOException e) {
- LOG.error("Exception thrown reading instance of: " + writableClazz, e);
- }
- return instance;
- }
- }
-
- private static class WritableToBytesMapFn<T extends Writable> extends MapFn<T, ByteBuffer> {
- private static final Log LOG = LogFactory.getLog(WritableToBytesMapFn.class);
-
- @Override
- public ByteBuffer map(T input) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream das = new DataOutputStream(baos);
- try {
- input.write(das);
- } catch (IOException e) {
- LOG.error("Exception thrown converting Writable to bytes", e);
- }
- return ByteBuffer.wrap(baos.toByteArray());
- }
- }
-
- public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
- return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
- new WritableToBytesMapFn<T>());
- }
-
- private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
-
- private final MapFn<Object,T> mapFn;
-
- public GenericDataArrayToCollection(MapFn<Object,T> mapFn) {
- this.mapFn = mapFn;
- }
-
- @Override
- public void configure(Configuration conf) {
- mapFn.configure(conf);
- }
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
- public void initialize() {
- this.mapFn.setContext(getContext());
- }
-
- @Override
- public Collection<T> map(Object input) {
- Collection<T> ret = Lists.newArrayList();
- if (input instanceof Collection) {
- for (Object in : (Collection<Object>) input) {
- ret.add(mapFn.map(in));
- }
- } else {
- // Assume it is an array
- Object[] arr = (Object[]) input;
- for (Object in : arr) {
- ret.add(mapFn.map(in));
- }
- }
- return ret;
- }
- }
-
- private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
-
- private final MapFn mapFn;
- private final String jsonSchema;
- private transient Schema schema;
-
- public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
- this.mapFn = mapFn;
- this.jsonSchema = schema.toString();
- }
-
- @Override
- public void configure(Configuration conf) {
- mapFn.configure(conf);
- }
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
- public void initialize() {
- this.mapFn.setContext(getContext());
- }
-
- @Override
- public GenericData.Array<?> map(Collection<?> input) {
- if(schema == null) {
- schema = new Schema.Parser().parse(jsonSchema);
- }
- GenericData.Array array = new GenericData.Array(input.size(), schema);
- for (Object in : input) {
- array.add(mapFn.map(in));
- }
- return array;
- }
- }
-
- public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
- AvroType<T> avroType = (AvroType<T>) ptype;
- Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
- GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
- CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
- return new AvroType(Collection.class, collectionSchema, input, output, ptype);
- }
-
- private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
- private final MapFn<Object, T> mapFn;
-
- public AvroMapToMap(MapFn<Object, T> mapFn) {
- this.mapFn = mapFn;
- }
-
- @Override
- public void configure(Configuration conf) {
- mapFn.configure(conf);
- }
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
- public void initialize() {
- this.mapFn.setContext(getContext());
- }
-
- @Override
- public Map<String, T> map(Map<CharSequence, Object> input) {
- Map<String, T> out = Maps.newHashMap();
- for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
- out.put(e.getKey().toString(), mapFn.map(e.getValue()));
- }
- return out;
- }
- }
-
- private static class MapToAvroMap<T> extends MapFn<Map<String, T>, Map<Utf8, Object>> {
- private final MapFn<T, Object> mapFn;
-
- public MapToAvroMap(MapFn<T, Object> mapFn) {
- this.mapFn = mapFn;
- }
-
- @Override
- public void configure(Configuration conf) {
- mapFn.configure(conf);
- }
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- mapFn.setConfigurationForTest(conf);
- }
-
- @Override
- public void initialize() {
- this.mapFn.setContext(getContext());
- }
-
- @Override
- public Map<Utf8, Object> map(Map<String, T> input) {
- Map<Utf8, Object> out = Maps.newHashMap();
- for (Map.Entry<String, T> e : input.entrySet()) {
- out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
- }
- return out;
- }
- }
-
- public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
- AvroType<T> avroType = (AvroType<T>) ptype;
- Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
- AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
- MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
- return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
- }
-
- private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
- private final TupleFactory<?> tupleFactory;
- private final List<MapFn> fns;
-
- private transient Object[] values;
-
- public GenericRecordToTuple(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
- this.tupleFactory = tupleFactory;
- this.fns = Lists.newArrayList();
- for (PType<?> ptype : ptypes) {
- AvroType atype = (AvroType) ptype;
- fns.add(atype.getInputMapFn());
- }
- }
-
- @Override
- public void configure(Configuration conf) {
- for (MapFn fn : fns) {
- fn.configure(conf);
- }
- }
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- for (MapFn fn : fns) {
- fn.setConfigurationForTest(conf);
- }
- }
-
- @Override
- public void initialize() {
- for (MapFn fn : fns) {
- fn.setContext(getContext());
- }
- this.values = new Object[fns.size()];
- tupleFactory.initialize();
- }
-
- @Override
- public Tuple map(GenericRecord input) {
- for (int i = 0; i < values.length; i++) {
- Object v = input.get(i);
- if (v == null) {
- values[i] = null;
- } else {
- values[i] = fns.get(i).map(v);
- }
- }
- return tupleFactory.makeTuple(values);
- }
- }
-
- private static class TupleToGenericRecord extends MapFn<Tuple, GenericRecord> {
- private final List<MapFn> fns;
- private final String jsonSchema;
-
- private transient GenericRecord record;
-
- public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
- this.fns = Lists.newArrayList();
- this.jsonSchema = schema.toString();
- for (PType ptype : ptypes) {
- AvroType atype = (AvroType) ptype;
- fns.add(atype.getOutputMapFn());
- }
- }
-
- @Override
- public void configure(Configuration conf) {
- for (MapFn fn : fns) {
- fn.configure(conf);
- }
- }
-
- @Override
- public void setConfigurationForTest(Configuration conf) {
- for (MapFn fn : fns) {
- fn.setConfigurationForTest(conf);
- }
- }
-
- @Override
- public void initialize() {
- this.record = new GenericData.Record(new Schema.Parser().parse(jsonSchema));
- for (MapFn fn : fns) {
- fn.setContext(getContext());
- }
- }
-
- @Override
- public GenericRecord map(Tuple input) {
- for (int i = 0; i < input.size(); i++) {
- Object v = input.get(i);
- if (v == null) {
- record.put(i, null);
- } else {
- record.put(i, fns.get(i).map(v));
- }
- }
- return record;
- }
- }
-
- public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
- Schema schema = createTupleSchema(p1, p2);
- GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
- TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
- return new AvroType(Pair.class, schema, input, output, p1, p2);
- }
-
- public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1,
- PType<V2> p2, PType<V3> p3) {
- Schema schema = createTupleSchema(p1, p2, p3);
- return new AvroType(Tuple3.class, schema,
- new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
- new TupleToGenericRecord(schema, p1, p2, p3),
- p1, p2, p3);
- }
-
- public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
- PType<V2> p2, PType<V3> p3, PType<V4> p4) {
- Schema schema = createTupleSchema(p1, p2, p3, p4);
- return new AvroType(Tuple4.class, schema,
- new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
- new TupleToGenericRecord(schema, p1, p2, p3, p4),
- p1, p2, p3, p4);
- }
-
- public static final AvroType<TupleN> tuples(PType... ptypes) {
- Schema schema = createTupleSchema(ptypes);
- return new AvroType(TupleN.class, schema,
- new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
- new TupleToGenericRecord(schema, ptypes), ptypes);
- }
-
- public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
- Schema schema = createTupleSchema(ptypes);
- Class[] typeArgs = new Class[ptypes.length];
- for (int i = 0; i < typeArgs.length; i++) {
- typeArgs[i] = ptypes[i].getTypeClass();
- }
- TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
- return new AvroType<T>(clazz, schema,
- new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema, ptypes),
- ptypes);
- }
-
- private static Schema createTupleSchema(PType<?>... ptypes) {
- // Guarantee each tuple schema has a globally unique name
- String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
- Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
- List<Schema.Field> fields = Lists.newArrayList();
- for (int i = 0; i < ptypes.length; i++) {
- AvroType atype = (AvroType) ptypes[i];
- Schema fieldSchema = allowNulls(atype.getSchema());
- fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
- }
- schema.setFields(fields);
- return schema;
- }
-
- public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
- MapFn<T, S> outputFn, PType<S> base) {
- AvroType<S> abase = (AvroType<S>) base;
- return new AvroType<T>(clazz, abase.getSchema(),
- new CompositeMapFn(abase.getInputMapFn(), inputFn),
- new CompositeMapFn(outputFn, abase.getOutputMapFn()),
- base.getSubTypes().toArray(new PType[0]));
- }
-
- public static <T> PType<T> jsons(Class<T> clazz) {
- return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
- }
-
- public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key, PType<V> value) {
- AvroType<K> avroKey = (AvroType<K>) key;
- AvroType<V> avroValue = (AvroType<V>) value;
- return new AvroTableType(avroKey, avroValue, Pair.class);
- }
-
- private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
- private static Schema allowNulls(Schema base) {
- if (NULL_SCHEMA.equals(base)) {
- return base;
- }
- return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
- }
-
- private Avros() {}
+ /**
+ * The instance we use for generating reflected schemas. May be modified by
+ * clients (e.g., Scrunch.)
+ */
+ public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
+
+ /**
+ * The name of the configuration parameter that tracks which reflection
+ * factory to use.
+ */
+ public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
+
+ public static void configureReflectDataFactory(Configuration conf) {
+ conf.setClass(REFLECT_DATA_FACTORY_CLASS,
+ REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
+ }
+
+ public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
+ return (ReflectDataFactory) ReflectionUtils.newInstance(conf.getClass(
+ REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
+ }
+
+ public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
+ @Override
+ public String map(CharSequence input) {
+ return input.toString();
+ }
+ };
+
+ public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
+ @Override
+ public Utf8 map(String input) {
+ return new Utf8(input);
+ }
+ };
+
+ public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
+ @Override
+ public ByteBuffer map(Object input) {
+ if (input instanceof ByteBuffer) {
+ return (ByteBuffer) input;
+ }
+ return ByteBuffer.wrap((byte[]) input);
+ }
+ };
+
+ private static final AvroType<String> strings = new AvroType<String>(
+ String.class, Schema.create(Schema.Type.STRING), UTF8_TO_STRING,
+ STRING_TO_UTF8);
+ private static final AvroType<Void> nulls = create(Void.class,
+ Schema.Type.NULL);
+ private static final AvroType<Long> longs = create(Long.class,
+ Schema.Type.LONG);
+ private static final AvroType<Integer> ints = create(Integer.class,
+ Schema.Type.INT);
+ private static final AvroType<Float> floats = create(Float.class,
+ Schema.Type.FLOAT);
+ private static final AvroType<Double> doubles = create(Double.class,
+ Schema.Type.DOUBLE);
+ private static final AvroType<Boolean> booleans = create(Boolean.class,
+ Schema.Type.BOOLEAN);
+ private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(
+ ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN,
+ IdentityFn.getInstance());
+
+ private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
+ .<Class<?>, PType<?>> builder().put(String.class, strings)
+ .put(Long.class, longs).put(Integer.class, ints)
+ .put(Float.class, floats).put(Double.class, doubles)
+ .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+
+ private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps
+ .newHashMap();
+
+ public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
+ EXTENSIONS.put(clazz, ptype);
+ }
+
+ public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
+ return (PType<T>) PRIMITIVES.get(clazz);
+ }
+
+ private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
+ return new AvroType<T>(clazz, Schema.create(schemaType));
+ }
+
+ public static final AvroType<Void> nulls() {
+ return nulls;
+ }
+
+ public static final AvroType<String> strings() {
+ return strings;
+ }
+
+ public static final AvroType<Long> longs() {
+ return longs;
+ }
+
+ public static final AvroType<Integer> ints() {
+ return ints;
+ }
+
+ public static final AvroType<Float> floats() {
+ return floats;
+ }
+
+ public static final AvroType<Double> doubles() {
+ return doubles;
+ }
+
+ public static final AvroType<Boolean> booleans() {
+ return booleans;
+ }
+
+ public static final AvroType<ByteBuffer> bytes() {
+ return bytes;
+ }
+
+ public static final <T> AvroType<T> records(Class<T> clazz) {
+ if (EXTENSIONS.containsKey(clazz)) {
+ return (AvroType<T>) EXTENSIONS.get(clazz);
+ }
+ return containers(clazz);
+ }
+
+ public static final AvroType<GenericData.Record> generics(Schema schema) {
+ return new AvroType<GenericData.Record>(GenericData.Record.class,
+ schema);
+ }
+
+ public static final <T> AvroType<T> containers(Class<T> clazz) {
+ return reflects(clazz);
+ }
+
+ public static final <T> AvroType<T> reflects(Class<T> clazz) {
+ return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData()
+ .getSchema(clazz));
+ }
+
+ private static class BytesToWritableMapFn<T extends Writable> extends
+ MapFn<ByteBuffer, T> {
+ private static final Log LOG = LogFactory
+ .getLog(BytesToWritableMapFn.class);
+
+ private final Class<T> writableClazz;
+
+ public BytesToWritableMapFn(Class<T> writableClazz) {
+ this.writableClazz = writableClazz;
+ }
+
+ @Override
+ public T map(ByteBuffer input) {
+ T instance = ReflectionUtils.newInstance(writableClazz,
+ getConfiguration());
+ try {
+ instance.readFields(new DataInputStream(
+ new ByteArrayInputStream(input.array(), input
+ .arrayOffset(), input.limit())));
+ } catch (IOException e) {
+ LOG.error("Exception thrown reading instance of: "
+ + writableClazz, e);
+ }
+ return instance;
+ }
+ }
+
+ private static class WritableToBytesMapFn<T extends Writable> extends
+ MapFn<T, ByteBuffer> {
+ private static final Log LOG = LogFactory
+ .getLog(WritableToBytesMapFn.class);
+
+ @Override
+ public ByteBuffer map(T input) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream das = new DataOutputStream(baos);
+ try {
+ input.write(das);
+ } catch (IOException e) {
+ LOG.error("Exception thrown converting Writable to bytes", e);
+ }
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+ }
+
+ public static final <T extends Writable> AvroType<T> writables(
+ Class<T> clazz) {
+ return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES),
+ new BytesToWritableMapFn<T>(clazz),
+ new WritableToBytesMapFn<T>());
+ }
+
+ private static class GenericDataArrayToCollection<T> extends
+ MapFn<Object, Collection<T>> {
+
+ private final MapFn<Object, T> mapFn;
+
+ public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
+ this.mapFn = mapFn;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ mapFn.configure(conf);
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ mapFn.setConfigurationForTest(conf);
+ }
+
+ @Override
+ public void initialize() {
+ this.mapFn.setContext(getContext());
+ }
+
+ @Override
+ public Collection<T> map(Object input) {
+ Collection<T> ret = Lists.newArrayList();
+ if (input instanceof Collection) {
+ for (Object in : (Collection<Object>) input) {
+ ret.add(mapFn.map(in));
+ }
+ } else {
+ // Assume it is an array
+ Object[] arr = (Object[]) input;
+ for (Object in : arr) {
+ ret.add(mapFn.map(in));
+ }
+ }
+ return ret;
+ }
+ }
+
+ private static class CollectionToGenericDataArray extends
+ MapFn<Collection<?>, GenericData.Array<?>> {
+
+ private final MapFn mapFn;
+ private final String jsonSchema;
+ private transient Schema schema;
+
+ public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
+ this.mapFn = mapFn;
+ this.jsonSchema = schema.toString();
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ mapFn.configure(conf);
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ mapFn.setConfigurationForTest(conf);
+ }
+
+ @Override
+ public void initialize() {
+ this.mapFn.setContext(getContext());
+ }
+
+ @Override
+ public GenericData.Array<?> map(Collection<?> input) {
+ if (schema == null) {
+ schema = new Schema.Parser().parse(jsonSchema);
+ }
+ GenericData.Array array = new GenericData.Array(input.size(),
+ schema);
+ for (Object in : input) {
+ array.add(mapFn.map(in));
+ }
+ return array;
+ }
+ }
+
+ public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
+ AvroType<T> avroType = (AvroType<T>) ptype;
+ Schema collectionSchema = Schema.createArray(allowNulls(avroType
+ .getSchema()));
+ GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
+ avroType.getInputMapFn());
+ CollectionToGenericDataArray output = new CollectionToGenericDataArray(
+ collectionSchema, avroType.getOutputMapFn());
+ return new AvroType(Collection.class, collectionSchema, input, output,
+ ptype);
+ }
+
+ private static class AvroMapToMap<T> extends
+ MapFn<Map<CharSequence, Object>, Map<String, T>> {
+ private final MapFn<Object, T> mapFn;
+
+ public AvroMapToMap(MapFn<Object, T> mapFn) {
+ this.mapFn = mapFn;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ mapFn.configure(conf);
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ mapFn.setConfigurationForTest(conf);
+ }
+
+ @Override
+ public void initialize() {
+ this.mapFn.setContext(getContext());
+ }
+
+ @Override
+ public Map<String, T> map(Map<CharSequence, Object> input) {
+ Map<String, T> out = Maps.newHashMap();
+ for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
+ out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+ }
+ return out;
+ }
+ }
+
+ private static class MapToAvroMap<T> extends
+ MapFn<Map<String, T>, Map<Utf8, Object>> {
+ private final MapFn<T, Object> mapFn;
+
+ public MapToAvroMap(MapFn<T, Object> mapFn) {
+ this.mapFn = mapFn;
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ mapFn.configure(conf);
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ mapFn.setConfigurationForTest(conf);
+ }
+
+ @Override
+ public void initialize() {
+ this.mapFn.setContext(getContext());
+ }
+
+ @Override
+ public Map<Utf8, Object> map(Map<String, T> input) {
+ Map<Utf8, Object> out = Maps.newHashMap();
+ for (Map.Entry<String, T> e : input.entrySet()) {
+ out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
+ }
+ return out;
+ }
+ }
+
+ public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
+ AvroType<T> avroType = (AvroType<T>) ptype;
+ Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
+ AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
+ MapToAvroMap<T> outputFn = new MapToAvroMap<T>(
+ avroType.getOutputMapFn());
+ return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
+ }
+
+ private static class GenericRecordToTuple extends
+ MapFn<GenericRecord, Tuple> {
+ private final TupleFactory<?> tupleFactory;
+ private final List<MapFn> fns;
+
+ private transient Object[] values;
+
+ public GenericRecordToTuple(TupleFactory<?> tupleFactory,
+ PType<?>... ptypes) {
+ this.tupleFactory = tupleFactory;
+ this.fns = Lists.newArrayList();
+ for (PType<?> ptype : ptypes) {
+ AvroType atype = (AvroType) ptype;
+ fns.add(atype.getInputMapFn());
+ }
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ for (MapFn fn : fns) {
+ fn.configure(conf);
+ }
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ for (MapFn fn : fns) {
+ fn.setConfigurationForTest(conf);
+ }
+ }
+
+ @Override
+ public void initialize() {
+ for (MapFn fn : fns) {
+ fn.setContext(getContext());
+ }
+ this.values = new Object[fns.size()];
+ tupleFactory.initialize();
+ }
+
+ @Override
+ public Tuple map(GenericRecord input) {
+ for (int i = 0; i < values.length; i++) {
+ Object v = input.get(i);
+ if (v == null) {
+ values[i] = null;
+ } else {
+ values[i] = fns.get(i).map(v);
+ }
+ }
+ return tupleFactory.makeTuple(values);
+ }
+ }
+
+ private static class TupleToGenericRecord extends
+ MapFn<Tuple, GenericRecord> {
+ private final List<MapFn> fns;
+ private final String jsonSchema;
+
+ private transient GenericRecord record;
+
+ public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
+ this.fns = Lists.newArrayList();
+ this.jsonSchema = schema.toString();
+ for (PType ptype : ptypes) {
+ AvroType atype = (AvroType) ptype;
+ fns.add(atype.getOutputMapFn());
+ }
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ for (MapFn fn : fns) {
+ fn.configure(conf);
+ }
+ }
+
+ @Override
+ public void setConfigurationForTest(Configuration conf) {
+ for (MapFn fn : fns) {
+ fn.setConfigurationForTest(conf);
+ }
+ }
+
+ @Override
+ public void initialize() {
+ this.record = new GenericData.Record(
+ new Schema.Parser().parse(jsonSchema));
+ for (MapFn fn : fns) {
+ fn.setContext(getContext());
+ }
+ }
+
+ @Override
+ public GenericRecord map(Tuple input) {
+ for (int i = 0; i < input.size(); i++) {
+ Object v = input.get(i);
+ if (v == null) {
+ record.put(i, null);
+ } else {
+ record.put(i, fns.get(i).map(v));
+ }
+ }
+ return record;
+ }
+ }
+
+ public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1,
+ PType<V2> p2) {
+ Schema schema = createTupleSchema(p1, p2);
+ GenericRecordToTuple input = new GenericRecordToTuple(
+ TupleFactory.PAIR, p1, p2);
+ TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
+ return new AvroType(Pair.class, schema, input, output, p1, p2);
+ }
+
+ public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(
+ PType<V1> p1, PType<V2> p2, PType<V3> p3) {
+ Schema schema = createTupleSchema(p1, p2, p3);
+ return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(
+ TupleFactory.TUPLE3, p1, p2, p3), new TupleToGenericRecord(
+ schema, p1, p2, p3), p1, p2, p3);
+ }
+
+ public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(
+ PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+ Schema schema = createTupleSchema(p1, p2, p3, p4);
+ return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(
+ TupleFactory.TUPLE4, p1, p2, p3, p4), new TupleToGenericRecord(
+ schema, p1, p2, p3, p4), p1, p2, p3, p4);
+ }
+
+ public static final AvroType<TupleN> tuples(PType... ptypes) {
+ Schema schema = createTupleSchema(ptypes);
+ return new AvroType(TupleN.class, schema, new GenericRecordToTuple(
+ TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
+ ptypes), ptypes);
+ }
+
+ public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz,
+ PType... ptypes) {
+ Schema schema = createTupleSchema(ptypes);
+ Class[] typeArgs = new Class[ptypes.length];
+ for (int i = 0; i < typeArgs.length; i++) {
+ typeArgs[i] = ptypes[i].getTypeClass();
+ }
+ TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
+ return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory,
+ ptypes), new TupleToGenericRecord(schema, ptypes), ptypes);
+ }
+
+ private static Schema createTupleSchema(PType<?>... ptypes) {
+ // Guarantee each tuple schema has a globally unique name
+ String tupleName = "tuple"
+ + UUID.randomUUID().toString().replace('-', 'x');
+ Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+ List<Schema.Field> fields = Lists.newArrayList();
+ for (int i = 0; i < ptypes.length; i++) {
+ AvroType atype = (AvroType) ptypes[i];
+ Schema fieldSchema = allowNulls(atype.getSchema());
+ fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
+ }
+ schema.setFields(fields);
+ return schema;
+ }
+
+ public static final <S, T> AvroType<T> derived(Class<T> clazz,
+ MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
+ AvroType<S> abase = (AvroType<S>) base;
+ return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(
+ abase.getInputMapFn(), inputFn), new CompositeMapFn(outputFn,
+ abase.getOutputMapFn()), base.getSubTypes().toArray(
+ new PType[0]));
+ }
+
+ public static <T> PType<T> jsons(Class<T> clazz) {
+ return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
+ }
+
+ public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key,
+ PType<V> value) {
+ AvroType<K> avroKey = (AvroType<K>) key;
+ AvroType<V> avroValue = (AvroType<V>) value;
+ return new AvroTableType(avroKey, avroValue, Pair.class);
+ }
+
+ private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
+
+ private static Schema allowNulls(Schema base) {
+ if (NULL_SCHEMA.equals(base)) {
+ return base;
+ }
+ return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
+ }
+
+ private Avros() {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
index 9739d17..390fbf1 100644
--- a/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
+++ b/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
@@ -14,15 +14,12 @@
*/
package com.cloudera.crunch.io.avro;
-import static com.google.common.io.Resources.getResource;
-import static com.google.common.io.Resources.newInputStreamSupplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
@@ -31,6 +28,7 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,17 +39,15 @@ import org.junit.Test;
import com.cloudera.crunch.test.Person;
import com.cloudera.crunch.types.avro.Avros;
import com.google.common.collect.Lists;
-import com.google.common.io.InputSupplier;
public class AvroFileReaderFactoryTest {
private File avroFile;
- private Schema schema;
@Before
public void setUp() throws IOException {
- InputSupplier<InputStream> inputStreamSupplier = newInputStreamSupplier(getResource("person.avro"));
- schema = new Schema.Parser().parse(inputStreamSupplier.getInput());
+ // InputSupplier<InputStream> inputStreamSupplier =
+ // newInputStreamSupplier(getResource("person.avro"));
avroFile = File.createTempFile("test", ".av");
}
@@ -60,15 +56,15 @@ public class AvroFileReaderFactoryTest {
avroFile.delete();
}
- private void populateGenericFile(List<GenericRecord> genericRecords)
- throws IOException {
+ private void populateGenericFile(List<GenericRecord> genericRecords,
+ Schema outputSchema) throws IOException {
FileOutputStream outputStream = new FileOutputStream(this.avroFile);
GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
- schema);
+ outputSchema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
genericDatumWriter);
- dataFileWriter.create(schema, outputStream);
+ dataFileWriter.create(outputSchema, outputStream);
for (GenericRecord record : genericRecords) {
dataFileWriter.append(record);
@@ -81,17 +77,17 @@ public class AvroFileReaderFactoryTest {
@Test
public void testRead_GenericReader() throws IOException {
- GenericRecord savedRecord = new GenericData.Record(schema);
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
savedRecord.put("name", "John Doe");
savedRecord.put("age", 42);
savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
- populateGenericFile(Lists.newArrayList(savedRecord));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
AvroFileReaderFactory<GenericData.Record> genericReader = new AvroFileReaderFactory<GenericData.Record>(
- Avros.generics(schema), new Configuration());
+ Avros.generics(Person.SCHEMA$), new Configuration());
Iterator<GenericData.Record> recordIterator = genericReader.read(
- FileSystem.getLocal(new Configuration()),
- new Path(this.avroFile.getAbsolutePath()));
+ FileSystem.getLocal(new Configuration()), new Path(
+ this.avroFile.getAbsolutePath()));
GenericRecord genericRecord = recordIterator.next();
assertEquals(savedRecord, genericRecord);
@@ -100,16 +96,16 @@ public class AvroFileReaderFactoryTest {
@Test
public void testRead_SpecificReader() throws IOException {
- GenericRecord savedRecord = new GenericData.Record(schema);
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
savedRecord.put("name", "John Doe");
savedRecord.put("age", 42);
savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
- populateGenericFile(Lists.newArrayList(savedRecord));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
AvroFileReaderFactory<Person> genericReader = new AvroFileReaderFactory<Person>(
Avros.records(Person.class), new Configuration());
- Iterator<Person> recordIterator = genericReader.read(
- FileSystem.getLocal(new Configuration()),
+ Iterator<Person> recordIterator = genericReader.read(FileSystem
+ .getLocal(new Configuration()),
new Path(this.avroFile.getAbsolutePath()));
Person expectedPerson = new Person();
@@ -125,4 +121,35 @@ public class AvroFileReaderFactoryTest {
assertEquals(expectedPerson, person);
assertFalse(recordIterator.hasNext());
}
+
+ @Test
+ public void testRead_ReflectReader() throws IOException {
+ Schema reflectSchema = ReflectData.get().getSchema(PojoPerson.class);
+ GenericRecord savedRecord = new GenericData.Record(reflectSchema);
+ savedRecord.put("name", "John Doe");
+ populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema);
+
+ AvroFileReaderFactory<PojoPerson> genericReader = new AvroFileReaderFactory<PojoPerson>(
+ Avros.reflects(PojoPerson.class), new Configuration());
+ Iterator<PojoPerson> recordIterator = genericReader.read(FileSystem
+ .getLocal(new Configuration()),
+ new Path(this.avroFile.getAbsolutePath()));
+
+ PojoPerson person = recordIterator.next();
+
+ assertEquals("John Doe", person.getName());
+ assertFalse(recordIterator.hasNext());
+ }
+
+ public static class PojoPerson {
+ private String name;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
index afe22f9..ce43a98 100644
--- a/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
+++ b/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -36,6 +37,7 @@ import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.io.At;
+import com.cloudera.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson;
import com.cloudera.crunch.test.Person;
import com.cloudera.crunch.types.avro.Avros;
import com.google.common.collect.Lists;
@@ -55,14 +57,15 @@ public class AvroFileSourceTargetTest implements Serializable {
avroFile.delete();
}
- private void populateGenericFile(List<GenericRecord> genericRecords) throws IOException {
+ private void populateGenericFile(List<GenericRecord> genericRecords,
+ Schema schema) throws IOException {
FileOutputStream outputStream = new FileOutputStream(this.avroFile);
GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
- Person.SCHEMA$);
+ schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
genericDatumWriter);
- dataFileWriter.create(Person.SCHEMA$, outputStream);
+ dataFileWriter.create(schema, outputStream);
for (GenericRecord record : genericRecords) {
dataFileWriter.append(record);
@@ -79,13 +82,14 @@ public class AvroFileSourceTargetTest implements Serializable {
savedRecord.put("name", "John Doe");
savedRecord.put("age", 42);
savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
- populateGenericFile(Lists.newArrayList(savedRecord));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
- PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
- Avros.records(Person.class)));
+ PCollection<Person> genericCollection = pipeline.read(At.avroFile(
+ avroFile.getAbsolutePath(), Avros.records(Person.class)));
- List<Person> personList = Lists.newArrayList(genericCollection.materialize());
+ List<Person> personList = Lists.newArrayList(genericCollection
+ .materialize());
Person expectedPerson = new Person();
expectedPerson.setName("John Doe");
@@ -96,25 +100,51 @@ public class AvroFileSourceTargetTest implements Serializable {
siblingNames.add("Jane");
expectedPerson.setSiblingnames(siblingNames);
- assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+ assertEquals(Lists.newArrayList(expectedPerson),
+ Lists.newArrayList(personList));
}
@Test
public void testGeneric() throws IOException {
- String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
- Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+ String genericSchemaJson = Person.SCHEMA$.toString().replace("Person",
+ "GenericPerson");
+ Schema genericPersonSchema = new Schema.Parser()
+ .parse(genericSchemaJson);
GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
savedRecord.put("name", "John Doe");
savedRecord.put("age", 42);
savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
- populateGenericFile(Lists.newArrayList(savedRecord));
+ populateGenericFile(Lists.newArrayList(savedRecord),
+ genericPersonSchema);
Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
- PCollection<Record> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
- Avros.generics(genericPersonSchema)));
+ PCollection<Record> genericCollection = pipeline
+ .read(At.avroFile(avroFile.getAbsolutePath(),
+ Avros.generics(genericPersonSchema)));
- List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
+ List<Record> recordList = Lists.newArrayList(genericCollection
+ .materialize());
- assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
+ assertEquals(Lists.newArrayList(savedRecord),
+ Lists.newArrayList(recordList));
+ }
+
+ @Test
+ public void testReflect() throws IOException {
+ Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class);
+ GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
+ savedRecord.put("name", "John Doe");
+ populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
+
+ Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
+ PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(
+ avroFile.getAbsolutePath(), Avros.reflects(PojoPerson.class)));
+
+ List<PojoPerson> recordList = Lists.newArrayList(personCollection
+ .materialize());
+
+ assertEquals(1, recordList.size());
+ PojoPerson person = recordList.get(0);
+ assertEquals("John Doe", person.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
new file mode 100644
index 0000000..b0e2e87
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
@@ -0,0 +1,98 @@
+package com.cloudera.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.MapFn;
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.test.FileHelper;
+import com.cloudera.crunch.types.avro.Avros;
+import com.google.common.collect.Lists;
+
+public class AvroReflectTest implements Serializable {
+
+ static class StringWrapper {
+ private String value;
+
+ public StringWrapper() {
+ this(null);
+ }
+
+ public StringWrapper(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("<StringWrapper(%s)>", value);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((value == null) ? 0 : value.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ StringWrapper other = (StringWrapper) obj;
+ if (value == null) {
+ if (other.value != null)
+ return false;
+ } else if (!value.equals(other.value))
+ return false;
+ return true;
+ }
+
+ }
+
+ @Test
+ public void testReflection() throws IOException {
+ Pipeline pipeline = new MRPipeline(AvroReflectTest.class);
+ PCollection<StringWrapper> stringWrapperCollection = pipeline
+ .readTextFile(FileHelper.createTempCopyOf("set1.txt"))
+ .parallelDo(new MapFn<String, StringWrapper>() {
+
+ @Override
+ public StringWrapper map(String input) {
+ StringWrapper stringWrapper = new StringWrapper();
+ stringWrapper.setValue(input);
+ return stringWrapper;
+ }
+ }, Avros.reflects(StringWrapper.class));
+
+ List<StringWrapper> stringWrappers = Lists
+ .newArrayList(stringWrapperCollection.materialize());
+
+ pipeline.done();
+
+ assertEquals(Lists.newArrayList(new StringWrapper("b"),
+ new StringWrapper("c"), new StringWrapper("a"),
+ new StringWrapper("e")), stringWrappers);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fe53b71f/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
index acfb3e6..d2c2ab9 100644
--- a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
+++ b/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
@@ -15,23 +15,52 @@ public class AvroTypeTest {
}
@Test
+ public void testIsGeneric_SpecificData() {
+ assertFalse(Avros.records(Person.class).isGeneric());
+ }
+
+ @Test
public void testIsSpecific_GenericData() {
assertFalse(Avros.generics(Person.SCHEMA$).isSpecific());
}
@Test
+ public void testIsGeneric_GenericData() {
+ assertTrue(Avros.generics(Person.SCHEMA$).isGeneric());
+ }
+
+ @Test
public void testIsSpecific_NonAvroClass() {
assertFalse(Avros.ints().isSpecific());
}
-
+
+ @Test
+ public void testIsGeneric_NonAvroClass() {
+ assertFalse(Avros.ints().isGeneric());
+ }
+
@Test
public void testIsSpecific_SpecificAvroTable() {
- assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isSpecific());
+ assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class))
+ .isSpecific());
+ }
+
+ @Test
+ public void testIsGeneric_SpecificAvroTable() {
+ assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class))
+ .isGeneric());
}
-
+
@Test
public void testIsSpecific_GenericAvroTable() {
- assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isSpecific());
+ assertFalse(Avros.tableOf(Avros.strings(),
+ Avros.generics(Person.SCHEMA$)).isSpecific());
+ }
+
+ @Test
+ public void testIsGeneric_GenericAvroTable() {
+ assertTrue(Avros.tableOf(Avros.strings(),
+ Avros.generics(Person.SCHEMA$)).isGeneric());
}
}