You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/08/09 22:59:55 UTC
git commit: Correct Avro deep copying on Pair and Collection.
Updated Branches:
refs/heads/master 547d56fa8 -> 153617a44
Correct Avro deep copying on Pair and Collection.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/153617a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/153617a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/153617a4
Branch: refs/heads/master
Commit: 153617a4444e439b6fe9dc06d3d4c5a3859307da
Parents: 547d56f
Author: Gabriel Reid <ga...@gmail.com>
Authored: Thu Aug 9 22:53:55 2012 +0200
Committer: Gabriel Reid <ga...@gmail.com>
Committed: Thu Aug 9 22:53:55 2012 +0200
----------------------------------------------------------------------
.../apache/crunch/types/CollectionDeepCopier.java | 49 ++++++++++
.../java/org/apache/crunch/types/DeepCopier.java | 37 ++++++++
.../org/apache/crunch/types/TupleDeepCopier.java | 52 +++++++++++
.../java/org/apache/crunch/types/TupleFactory.java | 23 +++++
.../apache/crunch/types/avro/AvroDeepCopier.java | 13 ++-
.../org/apache/crunch/types/avro/AvroType.java | 24 ++++--
.../java/org/apache/crunch/types/avro/Avros.java | 64 ++++++++------
.../crunch/types/CollectionDeepCopierTest.java | 31 +++++++
.../apache/crunch/types/TupleDeepCopierTest.java | 49 ++++++++++
.../org/apache/crunch/types/TupleFactoryTest.java | 69 +++++++++++++++
.../crunch/types/avro/AvroDeepCopierTest.java | 10 ++-
.../org/apache/crunch/types/avro/AvroTypeTest.java | 50 +++++++++--
12 files changed, 421 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
new file mode 100644
index 0000000..5bdc715
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.thirdparty.guava.common.collect.Lists;
+
+/**
+ * Performs deep copies (based on underlying PType deep copying) of Collections.
+ *
+ * @param <T>
+ * The type of Tuple implementation being copied
+ */
+public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>> {
+
+
+ private PType<T> elementType;
+
+ public CollectionDeepCopier(PType<T> elementType) {
+ this.elementType = elementType;
+ }
+
+ @Override
+ public Collection<T> deepCopy(Collection<T> source) {
+ List<T> copiedCollection = Lists.newArrayListWithCapacity(source.size());
+ for (T value : source){
+ copiedCollection.add(elementType.getDetachedValue(value));
+ }
+ return copiedCollection;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
new file mode 100644
index 0000000..539b808
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+/**
+ * Performs deep copies of values.
+ *
+ * @param <T>
+ * The type of value that will be copied
+ */
+public interface DeepCopier<T> {
+
+ /**
+ * Create a deep copy of a value.
+ *
+ * @param source
+ * The value to be copied
+ * @return The deep copy of the value
+ */
+ T deepCopy(T source);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
new file mode 100644
index 0000000..094b582
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.List;
+
+import org.apache.crunch.Tuple;
+
+/**
+ * Performs deep copies (based on underlying PType deep copying) of Tuple-based
+ * objects.
+ *
+ * @param <T>
+ * The type of Tuple implementation being copied
+ */
+public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> {
+
+ private final TupleFactory<T> tupleFactory;
+ private final List<PType> elementTypes;
+
+ public TupleDeepCopier(PType<T> ptype) {
+ tupleFactory = TupleFactory.getTupleFactory(ptype.getTypeClass());
+ elementTypes = ptype.getSubTypes();
+ }
+
+ @Override
+ public T deepCopy(T source) {
+ Object[] deepCopyValues = new Object[source.size()];
+
+ for (int valueIndex = 0; valueIndex < elementTypes.size(); valueIndex++) {
+ PType elementType = elementTypes.get(valueIndex);
+ deepCopyValues[valueIndex] = elementType.getDetachedValue(source.get(valueIndex));
+ }
+
+ return tupleFactory.makeTuple(deepCopyValues);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
index 16c3bcd..c547cd6 100644
--- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
@@ -34,6 +34,28 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable {
public abstract T makeTuple(Object... values);
+ /**
+ * Get the {@link TupleFactory} for a given Tuple implementation. Only
+ * standard Tuple implementations are supported.
+ *
+ * @param tupleClass
+ * The class for which the factory is to be retrieved
+ * @return The appropriate TupleFactory
+ */
+ public static <T extends Tuple> TupleFactory<T> getTupleFactory(Class<T> tupleClass) {
+ if (tupleClass == Pair.class) {
+ return (TupleFactory<T>) PAIR;
+ } else if (tupleClass == Tuple3.class) {
+ return (TupleFactory<T>) TUPLE3;
+ } else if (tupleClass == Tuple4.class) {
+ return (TupleFactory<T>) TUPLE4;
+ } else if (tupleClass == TupleN.class) {
+ return (TupleFactory<T>) TUPLEN;
+ } else {
+ throw new IllegalArgumentException("Can't create TupleFactory for " + tupleClass);
+ }
+ }
+
public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() {
@Override
public Pair makeTuple(Object... values) {
@@ -96,4 +118,5 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable {
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 078353a..ad5ba04 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -36,6 +36,7 @@ import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.DeepCopier;
/**
* Performs deep copies of Avro-serializable objects.
@@ -45,7 +46,7 @@ import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
* running in its own JVM, but it may well be a problem in any other kind of
* multi-threaded context.
*/
-public abstract class AvroDeepCopier<T> implements Serializable {
+public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
private BinaryEncoder binaryEncoder;
private BinaryDecoder binaryDecoder;
@@ -67,7 +68,7 @@ public abstract class AvroDeepCopier<T> implements Serializable {
private Class<T> valueClass;
public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
- super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader(schema));
+ super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader<T>(schema));
this.valueClass = valueClass;
}
@@ -113,6 +114,10 @@ public abstract class AvroDeepCopier<T> implements Serializable {
}
}
+ public static class AvroTupleDeepCopier {
+
+ }
+
/**
* Create a deep copy of an Avro value.
*
@@ -120,6 +125,7 @@ public abstract class AvroDeepCopier<T> implements Serializable {
* The value to be copied
* @return The deep copy of the value
*/
+ @Override
public T deepCopy(T source) {
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
@@ -127,7 +133,8 @@ public abstract class AvroDeepCopier<T> implements Serializable {
try {
datumWriter.write(source, binaryEncoder);
binaryEncoder.flush();
- binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+ binaryDecoder = DecoderFactory.get()
+ .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
datumReader.read(target, binaryDecoder);
} catch (Exception e) {
throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index b3ce576..8d7fbd5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -17,6 +17,7 @@
*/
package org.apache.crunch.types.avro;
+import java.util.Collection;
import java.util.List;
import org.apache.avro.Schema;
@@ -25,11 +26,15 @@ import org.apache.avro.specific.SpecificRecord;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.MapFn;
import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Tuple;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.io.avro.AvroFileSourceTarget;
+import org.apache.crunch.types.CollectionDeepCopier;
import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.DeepCopier;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleDeepCopier;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Preconditions;
@@ -50,13 +55,14 @@ public class AvroType<T> implements PType<T> {
private final MapFn baseInputMapFn;
private final MapFn baseOutputMapFn;
private final List<PType> subTypes;
- private AvroDeepCopier<T> deepCopier;
+ private DeepCopier<T> deepCopier;
public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), ptypes);
}
- public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, PType... ptypes) {
+ public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
+ PType... ptypes) {
this.typeClass = typeClass;
this.schema = Preconditions.checkNotNull(schema);
this.schemaString = schema.toString();
@@ -125,7 +131,8 @@ public class AvroType<T> implements PType<T> {
return false;
}
- return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class.isAssignableFrom(typeClass));
+ return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class
+ .isAssignableFrom(typeClass));
}
public MapFn<Object, T> getInputMapFn() {
@@ -146,9 +153,13 @@ public class AvroType<T> implements PType<T> {
return new AvroFileSourceTarget<T>(path, this);
}
- private AvroDeepCopier<T> getDeepCopier() {
+ private DeepCopier<T> getDeepCopier() {
if (deepCopier == null) {
- if (isSpecific()) {
+ if (Tuple.class.isAssignableFrom(this.typeClass)) {
+ deepCopier = new TupleDeepCopier(this);
+ } else if (Collection.class.isAssignableFrom(this.typeClass)){
+ deepCopier = new CollectionDeepCopier(this.subTypes.get(0));
+ } else if (isSpecific()) {
deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema());
} else if (isGeneric()) {
deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema());
@@ -160,7 +171,8 @@ public class AvroType<T> implements PType<T> {
}
public T getDetachedValue(T value) {
- if (this.baseInputMapFn instanceof IdentityFn && !Avros.isPrimitive(this)) {
+
+ if (!Avros.isPrimitive(this)) {
return getDeepCopier().deepCopy(value);
}
return value;
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index f8214a1..a6d7169 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -77,7 +77,8 @@ public class Avros {
public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
public static void configureReflectDataFactory(Configuration conf) {
- conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
+ conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(),
+ ReflectDataFactory.class);
}
public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
@@ -109,8 +110,8 @@ public class Avros {
}
};
- private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
- UTF8_TO_STRING, STRING_TO_UTF8);
+ private static final AvroType<String> strings = new AvroType<String>(String.class,
+ Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8);
private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
@@ -120,9 +121,10 @@ public class Avros {
private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance());
- private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
- .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
- .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+ private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
+ .<Class<?>, PType<?>> builder().put(String.class, strings).put(Long.class, longs)
+ .put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles)
+ .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
@@ -206,8 +208,8 @@ public class Avros {
public T map(ByteBuffer input) {
T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
try {
- instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(), input
- .limit())));
+ instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input
+ .arrayOffset(), input.limit())));
} catch (IOException e) {
LOG.error("Exception thrown reading instance of: " + writableClazz, e);
}
@@ -232,8 +234,8 @@ public class Avros {
}
public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
- return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
- new WritableToBytesMapFn<T>());
+ return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(
+ clazz), new WritableToBytesMapFn<T>());
}
private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
@@ -277,7 +279,8 @@ public class Avros {
}
}
- private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
+ private static class CollectionToGenericDataArray extends
+ MapFn<Collection<?>, GenericData.Array<?>> {
private final MapFn mapFn;
private final String jsonSchema;
@@ -319,8 +322,10 @@ public class Avros {
public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
AvroType<T> avroType = (AvroType<T>) ptype;
Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
- GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
- CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
+ GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
+ avroType.getInputMapFn());
+ CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema,
+ avroType.getOutputMapFn());
return new AvroType(Collection.class, collectionSchema, input, output, ptype);
}
@@ -520,23 +525,25 @@ public class Avros {
return new AvroType(Pair.class, schema, input, output, p1, p2);
}
- public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
+ public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
+ PType<V3> p3) {
Schema schema = createTupleSchema(p1, p2, p3);
- return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
- new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
+ return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2,
+ p3), new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
}
- public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
- PType<V4> p4) {
+ public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
+ PType<V2> p2, PType<V3> p3, PType<V4> p4) {
Schema schema = createTupleSchema(p1, p2, p3, p4);
- return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
- new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
+ return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2,
+ p3, p4), new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
}
public static final AvroType<TupleN> tuples(PType... ptypes) {
Schema schema = createTupleSchema(ptypes);
- return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
- new TupleToGenericRecord(schema, ptypes), ptypes);
+ return new AvroType(TupleN.class, schema,
+ new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
+ ptypes), ptypes);
}
public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
@@ -546,8 +553,8 @@ public class Avros {
typeArgs[i] = ptypes[i].getTypeClass();
}
TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
- return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
- ptypes), ptypes);
+ return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes),
+ new TupleToGenericRecord(schema, ptypes), ptypes);
}
private static Schema createTupleSchema(PType<?>... ptypes) {
@@ -564,11 +571,12 @@ public class Avros {
return schema;
}
- public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
- PType<S> base) {
+ public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
+ MapFn<T, S> outputFn, PType<S> base) {
AvroType<S> abase = (AvroType<S>) base;
- return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
- new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(new PType[0]));
+ return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(),
+ inputFn), new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(
+ new PType[0]));
}
public static <T> PType<T> jsons(Class<T> clazz) {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
new file mode 100644
index 0000000..7e532e8
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
@@ -0,0 +1,31 @@
+package org.apache.crunch.types;
+
+import static org.junit.Assert.*;
+
+import java.util.Collection;
+
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CollectionDeepCopierTest {
+
+ @Test
+ public void testDeepCopy() {
+ Person person = new Person();
+ person.setAge(42);
+ person.setName("John Smith");
+ person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+ Collection<Person> personCollection = Lists.newArrayList(person);
+ CollectionDeepCopier<Person> collectionDeepCopier = new CollectionDeepCopier<Person>(Avros.records(Person.class));
+
+ Collection<Person> deepCopyCollection = collectionDeepCopier.deepCopy(personCollection);
+
+ assertEquals(personCollection, deepCopyCollection);
+ assertNotSame(personCollection.iterator().next(), deepCopyCollection.iterator().next());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
new file mode 100644
index 0000000..8a3a12f
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TupleDeepCopierTest {
+
+ @Test
+ public void testDeepCopy_Pair() {
+ Person person = new Person();
+ person.setName("John Doe");
+ person.setAge(42);
+ person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+ Pair<Integer, Person> inputPair = Pair.of(1, person);
+ DeepCopier<Pair<Integer, Person>> deepCopier = new TupleDeepCopier<Pair<Integer, Person>>(
+ Avros.pairs(Avros.ints(), Avros.records(Person.class)));
+
+ Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair);
+
+ assertEquals(inputPair, deepCopyPair);
+ assertNotSame(inputPair.second(), deepCopyPair.second());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
new file mode 100644
index 0000000..25b0371
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.junit.Test;
+
+public class TupleFactoryTest {
+
+ @Test
+ public void testGetTupleFactory_Pair() {
+ assertEquals(TupleFactory.PAIR, TupleFactory.getTupleFactory(Pair.class));
+ }
+
+ @Test
+ public void testGetTupleFactory_Tuple3() {
+ assertEquals(TupleFactory.TUPLE3, TupleFactory.getTupleFactory(Tuple3.class));
+ }
+
+ @Test
+ public void testGetTupleFactory_Tuple4() {
+ assertEquals(TupleFactory.TUPLE4, TupleFactory.getTupleFactory(Tuple4.class));
+ }
+
+ @Test
+ public void testGetTupleFactory_TupleN() {
+ assertEquals(TupleFactory.TUPLEN, TupleFactory.getTupleFactory(TupleN.class));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetTupleFactory_CustomTupleClass() {
+ TupleFactory.getTupleFactory(CustomTupleImplementation.class);
+ }
+
+ private static class CustomTupleImplementation implements Tuple {
+
+ @Override
+ public Object get(int index) {
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index d966dfe..fa1b4c4 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -36,7 +36,8 @@ public class AvroDeepCopierTest {
person.setAge(42);
person.setSiblingnames(Lists.<CharSequence> newArrayList());
- Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$).deepCopy(person);
+ Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+ .deepCopy(person);
assertEquals(person, deepCopyPerson);
assertNotSame(person, deepCopyPerson);
@@ -49,7 +50,8 @@ public class AvroDeepCopierTest {
record.put("age", 42);
record.put("siblingnames", Lists.newArrayList());
- Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(record);
+ Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$)
+ .deepCopy(record);
assertEquals(record, deepCopyRecord);
assertNotSame(record, deepCopyRecord);
@@ -62,8 +64,8 @@ public class AvroDeepCopierTest {
person.setAge(42);
person.setSiblingnames(Lists.<CharSequence> newArrayList());
- Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class, Person.SCHEMA$)
- .deepCopy(person);
+ Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class,
+ Person.SCHEMA$).deepCopy(person);
assertEquals(person, deepCopyPerson);
assertNotSame(person, deepCopyPerson);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
index 2a80a5e..486bd1a 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -23,8 +23,12 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import java.util.Collection;
+import java.util.List;
+
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
+import org.apache.crunch.Pair;
import org.apache.crunch.test.Person;
import org.apache.crunch.test.StringWrapper;
import org.junit.Test;
@@ -152,15 +156,19 @@ public class AvroTypeTest {
assertEquals(record, detachedRecord);
assertNotSame(record, detachedRecord);
}
-
- @Test
- public void testGetDetachedValue_SpecificAvroType() {
- AvroType<Person> specificType = Avros.records(Person.class);
+
+ private Person createPerson(){
Person person = new Person();
person.setName("name value");
person.setAge(42);
person.setSiblingnames(Lists.<CharSequence> newArrayList());
+ return person;
+ }
+ @Test
+ public void testGetDetachedValue_SpecificAvroType() {
+ AvroType<Person> specificType = Avros.records(Person.class);
+ Person person = createPerson();
Person detachedPerson = specificType.getDetachedValue(person);
assertEquals(person, detachedPerson);
assertNotSame(person, detachedPerson);
@@ -169,14 +177,38 @@ public class AvroTypeTest {
@Test
public void testGetDetachedValue_ReflectAvroType() {
AvroType<Person> reflectType = Avros.reflects(Person.class);
- Person person = new Person();
- person.setName("name value");
- person.setAge(42);
- person.setSiblingnames(Lists.<CharSequence> newArrayList());
-
+ Person person = createPerson();
Person detachedPerson = reflectType.getDetachedValue(person);
assertEquals(person, detachedPerson);
assertNotSame(person, detachedPerson);
}
+ @Test
+ public void testGetDetachedValue_Pair() {
+ Person person = createPerson();
+ AvroType<Pair<Integer, Person>> pairType = Avros.pairs(Avros.ints(),
+ Avros.records(Person.class));
+
+ Pair<Integer, Person> inputPair = Pair.of(1, person);
+ Pair<Integer, Person> detachedPair = pairType.getDetachedValue(inputPair);
+
+ assertEquals(inputPair, detachedPair);
+ assertNotSame(inputPair.second(), detachedPair.second());
+ }
+
+ @Test
+ public void testGetDetachedValue_Collection(){
+ Person person = createPerson();
+ List<Person> personList = Lists.newArrayList(person);
+
+ AvroType<Collection<Person>> collectionType = Avros.collections(Avros.records(Person.class));
+
+ Collection<Person> detachedCollection = collectionType.getDetachedValue(personList);
+
+ assertEquals(personList, detachedCollection);
+ Person detachedPerson = detachedCollection.iterator().next();
+
+ assertNotSame(person, detachedPerson);
+ }
+
}