You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/08/16 07:09:06 UTC
git commit: CRUNCH-45 - Fix deep copying on Writables
Updated Branches:
refs/heads/master e487447b0 -> 2932e9e4a
CRUNCH-45 - Fix deep copying on Writables
Correct broken getDetachedValue for Writable tuples, collections,
and maps. Also refactor Writable deepcopying to use MapFns that
are already present.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2932e9e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2932e9e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2932e9e4
Branch: refs/heads/master
Commit: 2932e9e4a053414e961b77e419a8b1d07ee2daa2
Parents: e487447
Author: Gabriel Reid <gr...@apache.org>
Authored: Wed Aug 15 09:30:32 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Thu Aug 16 06:56:37 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/crunch/types/DeepCopier.java | 13 ++-
.../org/apache/crunch/types/TupleDeepCopier.java | 8 +-
.../apache/crunch/types/avro/AvroDeepCopier.java | 74 +++++++++--
.../apache/crunch/types/avro/AvroTableType.java | 3 +-
.../org/apache/crunch/types/avro/AvroType.java | 38 +-----
.../java/org/apache/crunch/types/avro/Avros.java | 95 ++++++++-------
.../crunch/types/writable/WritableDeepCopier.java | 60 +++++++++
.../apache/crunch/types/writable/WritableType.java | 16 +--
.../apache/crunch/types/writable/Writables.java | 36 +-----
.../org/apache/crunch/types/PTypeUtilsTest.java | 2 +-
.../apache/crunch/types/TupleDeepCopierTest.java | 4 +-
.../crunch/types/avro/AvroDeepCopierTest.java | 1 +
.../org/apache/crunch/types/avro/AvroTypeTest.java | 14 ++-
.../org/apache/crunch/types/avro/AvrosTest.java | 5 +-
.../types/writable/WritableDeepCopierTest.java | 46 +++++++
.../crunch/types/writable/WritableTypeTest.java | 53 +++++++-
.../crunch/types/writable/WritablesTest.java | 7 -
17 files changed, 316 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
index 539b808..a96e7bf 100644
--- a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
@@ -17,13 +17,15 @@
*/
package org.apache.crunch.types;
+import java.io.Serializable;
+
/**
* Performs deep copies of values.
*
* @param <T>
* The type of value that will be copied
*/
-public interface DeepCopier<T> {
+public interface DeepCopier<T> extends Serializable {
/**
* Create a deep copy of a value.
@@ -33,5 +35,14 @@ public interface DeepCopier<T> {
* @return The deep copy of the value
*/
T deepCopy(T source);
+
+ static class NoOpDeepCopier<V> implements DeepCopier<V> {
+ @Override
+ public V deepCopy(V source) {
+ return source;
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
index 094b582..4f473a0 100644
--- a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
@@ -21,6 +21,8 @@ import java.util.List;
import org.apache.crunch.Tuple;
+import com.google.common.collect.Lists;
+
/**
* Performs deep copies (based on underlying PType deep copying) of Tuple-based
* objects.
@@ -33,9 +35,9 @@ public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> {
private final TupleFactory<T> tupleFactory;
private final List<PType> elementTypes;
- public TupleDeepCopier(PType<T> ptype) {
- tupleFactory = TupleFactory.getTupleFactory(ptype.getTypeClass());
- elementTypes = ptype.getSubTypes();
+ public TupleDeepCopier(Class<T> tupleClass, PType...elementTypes) {
+ tupleFactory = TupleFactory.getTupleFactory(tupleClass);
+ this.elementTypes = Lists.newArrayList(elementTypes);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index ad5ba04..fe4fe1a 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -48,18 +48,31 @@ import org.apache.crunch.types.DeepCopier;
*/
public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
+ private String jsonSchema;
+ private transient Schema schema;
private BinaryEncoder binaryEncoder;
private BinaryDecoder binaryDecoder;
- protected DatumWriter<T> datumWriter;
- protected DatumReader<T> datumReader;
- protected AvroDeepCopier(DatumWriter<T> datumWriter, DatumReader<T> datumReader) {
- this.datumWriter = datumWriter;
- this.datumReader = datumReader;
+ private transient DatumWriter<T> datumWriter;
+ private transient DatumReader<T> datumReader;
+
+ public AvroDeepCopier(Schema schema) {
+ this.jsonSchema = schema.toString();
+ }
+
+ protected Schema getSchema() {
+ if (schema == null) {
+ schema = new Schema.Parser().parse(jsonSchema);
+ }
+ return schema;
}
protected abstract T createCopyTarget();
+ protected abstract DatumWriter<T> createDatumWriter();
+
+ protected abstract DatumReader<T> createDatumReader();
+
/**
* Deep copier for Avro specific data objects.
*/
@@ -68,7 +81,7 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
private Class<T> valueClass;
public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
- super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader<T>(schema));
+ super(schema);
this.valueClass = valueClass;
}
@@ -77,6 +90,16 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
return createNewInstance(valueClass);
}
+ @Override
+ protected DatumWriter<T> createDatumWriter() {
+ return new SpecificDatumWriter<T>(getSchema());
+ }
+
+ @Override
+ protected DatumReader<T> createDatumReader() {
+ return new SpecificDatumReader<T>(getSchema());
+ }
+
}
/**
@@ -84,16 +107,25 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
*/
public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
- private Schema schema;
+ private transient Schema schema;
public AvroGenericDeepCopier(Schema schema) {
- super(new GenericDatumWriter<Record>(schema), new GenericDatumReader<Record>(schema));
- this.schema = schema;
+ super(schema);
}
@Override
protected Record createCopyTarget() {
- return new GenericData.Record(schema);
+ return new GenericData.Record(getSchema());
+ }
+
+ @Override
+ protected DatumReader<Record> createDatumReader() {
+ return new GenericDatumReader<Record>(getSchema());
+ }
+
+ @Override
+ protected DatumWriter<Record> createDatumWriter() {
+ return new GenericDatumWriter<Record>(getSchema());
}
}
@@ -101,10 +133,11 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
* Deep copier for Avro reflect data objects.
*/
public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
+
private Class<T> valueClass;
public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
- super(new ReflectDatumWriter<T>(schema), new ReflectDatumReader<T>(schema));
+ super(schema);
this.valueClass = valueClass;
}
@@ -112,6 +145,16 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
protected T createCopyTarget() {
return createNewInstance(valueClass);
}
+
+ @Override
+ protected DatumReader<T> createDatumReader() {
+ return new ReflectDatumReader<T>(getSchema());
+ }
+
+ @Override
+ protected DatumWriter<T> createDatumWriter() {
+ return new ReflectDatumWriter<T>(getSchema());
+ }
}
public static class AvroTupleDeepCopier {
@@ -127,14 +170,19 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
*/
@Override
public T deepCopy(T source) {
+ if (datumReader == null) {
+ datumReader = createDatumReader();
+ }
+ if (datumWriter == null) {
+ datumWriter = createDatumWriter();
+ }
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
T target = createCopyTarget();
try {
datumWriter.write(source, binaryEncoder);
binaryEncoder.flush();
- binaryDecoder = DecoderFactory.get()
- .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+ binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
datumReader.read(target, binaryDecoder);
} catch (Exception e) {
throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index eb26bf1..bd4b14c 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -25,6 +25,7 @@ import org.apache.crunch.lib.PTables;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleDeepCopier;
import org.apache.hadoop.conf.Configuration;
/**
@@ -121,7 +122,7 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()),
new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), new PairToAvroPair(keyType,
- valueType), keyType, valueType);
+ valueType), new TupleDeepCopier(Pair.class, keyType, valueType), keyType, valueType);
this.keyType = keyType;
this.valueType = valueType;
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index 82c4c91..6e35f5b 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -17,9 +17,7 @@
*/
package org.apache.crunch.types.avro;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -27,16 +25,12 @@ import org.apache.avro.specific.SpecificRecord;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.MapFn;
import org.apache.crunch.SourceTarget;
-import org.apache.crunch.Tuple;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.io.avro.AvroFileSourceTarget;
-import org.apache.crunch.types.CollectionDeepCopier;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.DeepCopier;
-import org.apache.crunch.types.MapDeepCopier;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.TupleDeepCopier;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Preconditions;
@@ -59,17 +53,18 @@ public class AvroType<T> implements PType<T> {
private final List<PType> subTypes;
private DeepCopier<T> deepCopier;
- public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
- this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), ptypes);
+ public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier, PType... ptypes) {
+ this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes);
}
- public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
+ public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, DeepCopier<T> deepCopier,
PType... ptypes) {
this.typeClass = typeClass;
this.schema = Preconditions.checkNotNull(schema);
this.schemaString = schema.toString();
this.baseInputMapFn = inputMapFn;
this.baseOutputMapFn = outputMapFn;
+ this.deepCopier = deepCopier;
this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
}
@@ -155,31 +150,8 @@ public class AvroType<T> implements PType<T> {
return new AvroFileSourceTarget<T>(path, this);
}
- private DeepCopier<T> getDeepCopier() {
- if (deepCopier == null) {
- if (Tuple.class.isAssignableFrom(this.typeClass)) {
- deepCopier = new TupleDeepCopier(this);
- } else if (Map.class.isAssignableFrom(this.typeClass)){
- deepCopier = new MapDeepCopier(this.subTypes.get(0));
- } else if (Collection.class.isAssignableFrom(this.typeClass)){
- deepCopier = new CollectionDeepCopier(this.subTypes.get(0));
- } else if (isSpecific()) {
- deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema());
- } else if (isGeneric()) {
- deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema());
- } else {
- deepCopier = new AvroDeepCopier.AvroReflectDeepCopier<T>(typeClass, getSchema());
- }
- }
- return deepCopier;
- }
-
public T getDetachedValue(T value) {
-
- if (!Avros.isPrimitive(this)) {
- return getDeepCopier().deepCopy(value);
- }
- return value;
+ return deepCopier.deepCopy(value);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index 00a297c..038f805 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -44,9 +44,14 @@ import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
import org.apache.crunch.fn.CompositeMapFn;
import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.types.CollectionDeepCopier;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.MapDeepCopier;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleDeepCopier;
import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.writable.WritableDeepCopier;
import org.apache.crunch.util.PTypes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -77,8 +82,7 @@ public class Avros {
public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
public static void configureReflectDataFactory(Configuration conf) {
- conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(),
- ReflectDataFactory.class);
+ conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
}
public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
@@ -110,8 +114,8 @@ public class Avros {
}
};
- private static final AvroType<String> strings = new AvroType<String>(String.class,
- Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8);
+ private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
+ UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>());
private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
@@ -119,12 +123,11 @@ public class Avros {
private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
- Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance());
+ Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier<ByteBuffer>());
- private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
- .<Class<?>, PType<?>> builder().put(String.class, strings).put(Long.class, longs)
- .put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles)
- .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+ private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
+ .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
+ .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
@@ -141,7 +144,7 @@ public class Avros {
}
private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
- return new AvroType<T>(clazz, Schema.create(schemaType));
+ return new AvroType<T>(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier<T>());
}
public static final AvroType<Void> nulls() {
@@ -184,7 +187,8 @@ public class Avros {
}
public static final AvroType<GenericData.Record> generics(Schema schema) {
- return new AvroType<GenericData.Record>(GenericData.Record.class, schema);
+ return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(
+ schema));
}
public static final <T> AvroType<T> containers(Class<T> clazz) {
@@ -192,7 +196,8 @@ public class Avros {
}
public static final <T> AvroType<T> reflects(Class<T> clazz) {
- return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz));
+ Schema schema = REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz);
+ return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
}
private static class BytesToWritableMapFn<T extends Writable> extends MapFn<ByteBuffer, T> {
@@ -208,8 +213,8 @@ public class Avros {
public T map(ByteBuffer input) {
T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
try {
- instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input
- .arrayOffset(), input.limit())));
+ instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(), input
+ .limit())));
} catch (IOException e) {
LOG.error("Exception thrown reading instance of: " + writableClazz, e);
}
@@ -234,8 +239,8 @@ public class Avros {
}
public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
- return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(
- clazz), new WritableToBytesMapFn<T>());
+ return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
+ new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz));
}
private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
@@ -279,8 +284,7 @@ public class Avros {
}
}
- private static class CollectionToGenericDataArray extends
- MapFn<Collection<?>, GenericData.Array<?>> {
+ private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
private final MapFn mapFn;
private final String jsonSchema;
@@ -322,11 +326,9 @@ public class Avros {
public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
AvroType<T> avroType = (AvroType<T>) ptype;
Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
- GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
- avroType.getInputMapFn());
- CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema,
- avroType.getOutputMapFn());
- return new AvroType(Collection.class, collectionSchema, input, output, ptype);
+ GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
+ CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
+ return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype), ptype);
}
private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
@@ -398,7 +400,7 @@ public class Avros {
Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
- return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
+ return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype), ptype);
}
private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
@@ -459,7 +461,7 @@ public class Avros {
private final String jsonSchema;
private final boolean isReflect;
private transient Schema schema;
-
+
public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
this.fns = Lists.newArrayList();
this.avroTypes = Lists.newArrayList();
@@ -497,13 +499,13 @@ public class Avros {
fn.setContext(getContext());
}
}
-
- private GenericRecord createRecord(){
+
+ private GenericRecord createRecord() {
if (isReflect) {
return new ReflectGenericRecord(schema);
} else {
return new GenericData.Record(schema);
- }
+ }
}
@Override
@@ -525,28 +527,27 @@ public class Avros {
Schema schema = createTupleSchema(p1, p2);
GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
- return new AvroType(Pair.class, schema, input, output, p1, p2);
+ return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), p1, p2);
}
- public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
- PType<V3> p3) {
+ public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
Schema schema = createTupleSchema(p1, p2, p3);
- return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2,
- p3), new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
+ return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
+ new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), p1, p2, p3);
}
- public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
- PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+ public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
+ PType<V4> p4) {
Schema schema = createTupleSchema(p1, p2, p3, p4);
- return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2,
- p3, p4), new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
+ return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
+ new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), p1, p2,
+ p3, p4);
}
public static final AvroType<TupleN> tuples(PType... ptypes) {
Schema schema = createTupleSchema(ptypes);
- return new AvroType(TupleN.class, schema,
- new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
- ptypes), ptypes);
+ return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
+ new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), ptypes);
}
public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
@@ -556,8 +557,8 @@ public class Avros {
typeArgs[i] = ptypes[i].getTypeClass();
}
TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
- return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes),
- new TupleToGenericRecord(schema, ptypes), ptypes);
+ return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
+ ptypes), new TupleDeepCopier(clazz, ptypes), ptypes);
}
private static Schema createTupleSchema(PType<?>... ptypes) {
@@ -574,12 +575,12 @@ public class Avros {
return schema;
}
- public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
- MapFn<T, S> outputFn, PType<S> base) {
+ public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
+ PType<S> base) {
AvroType<S> abase = (AvroType<S>) base;
- return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(),
- inputFn), new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(
- new PType[0]));
+ return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
+ new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(), base.getSubTypes()
+ .toArray(new PType[0]));
}
public static <T> PType<T> jsons(Class<T> clazz) {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
new file mode 100644
index 0000000..6469208
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * Performs deep copies of Writable values.
+ * @param <T> The type of Writable that can be copied
+ */
+public class WritableDeepCopier<T extends Writable> implements DeepCopier<T>{
+
+ private Class<T> writableClass;
+
+ public WritableDeepCopier(Class<T> writableClass){
+ this.writableClass = writableClass;
+ }
+
+ @Override
+ public T deepCopy(T source) {
+ ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(byteOutStream);
+ T copiedValue = null;
+ try {
+ source.write(dataOut);
+ dataOut.flush();
+ ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+ DataInput dataInput = new DataInputStream(byteInStream);
+ copiedValue = writableClass.newInstance();
+ copiedValue.readFields(dataInput);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException("Error while deep copying " + source, e);
+ }
+ return copiedValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
index 45502a3..23c95ea 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -22,9 +22,9 @@ import java.util.List;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.MapFn;
import org.apache.crunch.SourceTarget;
-import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.io.seq.SeqFileSourceTarget;
import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.DeepCopier;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.fs.Path;
@@ -39,6 +39,7 @@ public class WritableType<T, W extends Writable> implements PType<T> {
private final Converter converter;
private final MapFn<W, T> inputFn;
private final MapFn<T, W> outputFn;
+ private final DeepCopier<W> deepCopier;
private final List<PType> subTypes;
WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn,
@@ -48,6 +49,7 @@ public class WritableType<T, W extends Writable> implements PType<T> {
this.inputFn = inputDoFn;
this.outputFn = outputDoFn;
this.converter = new WritableValueConverter(writableClass);
+ this.deepCopier = new WritableDeepCopier<W>(writableClass);
this.subTypes = ImmutableList.<PType> builder().add(subTypes).build();
}
@@ -99,17 +101,11 @@ public class WritableType<T, W extends Writable> implements PType<T> {
return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes.equals(wt.subTypes));
}
- // Unchecked warnings are suppressed because we know that W and T are the same
- // type (due to the IdentityFn being used)
- @SuppressWarnings("unchecked")
@Override
public T getDetachedValue(T value) {
- if (this.inputFn.getClass().equals(IdentityFn.class)) {
- W writableValue = (W) value;
- return (T) Writables.deepCopy(writableValue, this.writableClass);
- } else {
- return value;
- }
+ W writableValue = outputFn.map(value);
+ W deepCopy = this.deepCopier.deepCopy(writableValue);
+ return inputFn.map(deepCopy);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
index f4906b7..23bc7f5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -17,11 +17,6 @@
*/
package org.apache.crunch.types.writable;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
@@ -35,8 +30,11 @@ import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
import org.apache.crunch.fn.CompositeMapFn;
import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.CollectionDeepCopier;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.MapDeepCopier;
import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleDeepCopier;
import org.apache.crunch.types.TupleFactory;
import org.apache.crunch.util.PTypes;
import org.apache.hadoop.conf.Configuration;
@@ -582,31 +580,7 @@ public class Writables {
return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());
}
- /**
- * Perform a deep copy of a writable value.
- *
- * @param value
- * The value to be copied
- * @param writableClass
- * The Writable class of the value to be copied
- * @return A fully detached deep copy of the input value
- */
- public static <T extends Writable> T deepCopy(T value, Class<T> writableClass) {
- ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
- DataOutputStream dataOut = new DataOutputStream(byteOutStream);
- T copiedValue = null;
- try {
- value.write(dataOut);
- dataOut.flush();
- ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
- DataInput dataInput = new DataInputStream(byteInStream);
- copiedValue = writableClass.newInstance();
- copiedValue.readFields(dataInput);
- } catch (Exception e) {
- throw new CrunchRuntimeException("Error while deep copying " + value, e);
- }
- return copiedValue;
- }
+
// Not instantiable
private Writables() {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java b/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
index f9217d5..e6fd90c 100644
--- a/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
@@ -77,7 +77,7 @@ public class PTypeUtilsTest {
@Test
public void testAvroRegistered() {
- AvroType<Utf8> at = new AvroType<Utf8>(Utf8.class, Schema.create(Schema.Type.STRING));
+ AvroType<Utf8> at = new AvroType<Utf8>(Utf8.class, Schema.create(Schema.Type.STRING), new DeepCopier.NoOpDeepCopier<Utf8>());
Avros.register(Utf8.class, at);
assertEquals(at, Avros.records(Utf8.class));
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
index 8a3a12f..017a813 100644
--- a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
@@ -37,8 +37,8 @@ public class TupleDeepCopierTest {
person.setSiblingnames(Lists.<CharSequence> newArrayList());
Pair<Integer, Person> inputPair = Pair.of(1, person);
- DeepCopier<Pair<Integer, Person>> deepCopier = new TupleDeepCopier<Pair<Integer, Person>>(
- Avros.pairs(Avros.ints(), Avros.records(Person.class)));
+ DeepCopier<Pair> deepCopier = new TupleDeepCopier<Pair>(
+ Pair.class, Avros.ints(), Avros.records(Person.class));
Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index fa1b4c4..6e2d89e 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -20,6 +20,7 @@ package org.apache.crunch.types.avro;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.crunch.test.Person;
import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
index 092b89e..28777f5 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.crunch.Pair;
+import org.apache.crunch.TupleN;
import org.apache.crunch.test.Person;
import org.apache.crunch.test.StringWrapper;
import org.junit.Test;
@@ -228,5 +229,16 @@ public class AvroTypeTest {
assertEquals(stringPersonMap, detachedMap);
assertNotSame(value, detachedMap.get(key));
}
-
+
+ @Test
+ public void testGetDetachedValue_TupleN(){
+ Person person = createPerson();
+ AvroType<TupleN> ptype = Avros.tuples(Avros.records(Person.class));
+ TupleN tuple = new TupleN(person);
+ TupleN detachedTuple = ptype.getDetachedValue(tuple);
+
+ assertEquals(tuple, detachedTuple);
+ assertNotSame(person, detachedTuple.get(0));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
index aec19f2..6e66f74 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
@@ -17,10 +17,10 @@
*/
package org.apache.crunch.types.avro;
-import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
@@ -39,6 +39,7 @@ import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
import org.apache.crunch.test.Person;
import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.DeepCopier;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.hadoop.io.IntWritable;
@@ -224,7 +225,7 @@ public class AvrosTest {
@Test
public void testIsPrimitive_TruePrimitiveValue(){
- AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT));
+ AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT), new DeepCopier.NoOpDeepCopier());
assertTrue(Avros.isPrimitive(truePrimitiveAvroType));
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
new file mode 100644
index 0000000..a691df2
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class WritableDeepCopierTest {
+
+ private WritableDeepCopier<Text> deepCopier;
+
+ @Before
+ public void setUp(){
+ deepCopier = new WritableDeepCopier<Text>(Text.class);
+ }
+
+ @Test
+ public void testDeepCopy(){
+ Text text = new Text("value");
+ Text deepCopy = deepCopier.deepCopy(text);
+
+ assertEquals(text, deepCopy);
+ assertNotSame(text, deepCopy);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
index ea0d11a..51a87f5 100644
--- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
@@ -20,18 +20,20 @@ package org.apache.crunch.types.writable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.Pair;
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.junit.Test;
-public class WritableTypeTest {
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
- @Test
- public void testGetDetachedValue_AlreadyMappedWritable() {
- WritableType<String, Text> stringType = Writables.strings();
- String value = "test";
- assertSame(value, stringType.getDetachedValue(value));
- }
+public class WritableTypeTest {
@Test
public void testGetDetachedValue_CustomWritable() {
@@ -43,4 +45,41 @@ public class WritableTypeTest {
assertNotSame(value, detachedValue);
}
+ @Test
+ public void testGetDetachedValue_Collection() {
+ Collection<Text> textCollection = Lists.newArrayList(new Text("value"));
+ WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables.collections(Writables
+ .writables(Text.class));
+
+ Collection<Text> detachedCollection = ptype.getDetachedValue(textCollection);
+ assertEquals(textCollection, detachedCollection);
+ assertNotSame(textCollection.iterator().next(), detachedCollection.iterator().next());
+ }
+
+ @Test
+ public void testGetDetachedValue_Tuple() {
+ Pair<Text, Text> textPair = Pair.of(new Text("one"), new Text("two"));
+ WritableType<Pair<Text, Text>, TupleWritable> ptype = Writables.pairs(Writables.writables(Text.class),
+ Writables.writables(Text.class));
+ ptype.getOutputMapFn().initialize();
+ ptype.getInputMapFn().initialize();
+
+ Pair<Text, Text> detachedPair = ptype.getDetachedValue(textPair);
+ assertEquals(textPair, detachedPair);
+ assertNotSame(textPair.first(), detachedPair.first());
+ assertNotSame(textPair.second(), detachedPair.second());
+ }
+
+ @Test
+ public void testGetDetachedValue_Map() {
+ Map<String, Text> stringTextMap = Maps.newHashMap();
+ stringTextMap.put("key", new Text("value"));
+
+ WritableType<Map<String, Text>, MapWritable> ptype = Writables.maps(Writables.writables(Text.class));
+ Map<String, Text> detachedMap = ptype.getDetachedValue(stringTextMap);
+
+ assertEquals(stringTextMap, detachedMap);
+ assertNotSame(stringTextMap.get("key"), detachedMap.get("key"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
index a8699b3..5396fba 100644
--- a/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java
@@ -253,11 +253,4 @@ public class WritablesTest {
assertEquals(writable, ptype.getOutputMapFn().map(java));
}
- @Test
- public void testDeepCopy() {
- Text text = new Text("Test");
- Text copiedText = Writables.deepCopy(text, Text.class);
- assertEquals(text, copiedText);
- assertNotSame(text, copiedText);
- }
}