You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/06/09 22:08:21 UTC
git commit: CRUNCH-416 Use built-in Avro deep copying
Repository: crunch
Updated Branches:
refs/heads/apache-crunch-0.8 83b0dba7c -> de346d01d
CRUNCH-416 Use built-in Avro deep copying
Replace serialization-based deep copying for Avro specific and
generic records.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/de346d01
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/de346d01
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/de346d01
Branch: refs/heads/apache-crunch-0.8
Commit: de346d01dc378b11161fcfd52c4e168c5832f4d4
Parents: 83b0dba
Author: Gabriel Reid <gr...@apache.org>
Authored: Sun Jun 8 22:19:20 2014 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Mon Jun 9 22:06:28 2014 +0200
----------------------------------------------------------------------
.../crunch/types/avro/AvroDeepCopier.java | 147 +++++++------------
.../org/apache/crunch/types/avro/Avros.java | 6 +-
.../crunch/types/avro/AvroDeepCopierTest.java | 29 ++--
.../AvroSpecificDeepCopierClassloaderTest.java | 2 +-
4 files changed, 75 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/de346d01/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 855aa79..dcc4a19 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -24,16 +24,13 @@ import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificData;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.types.DeepCopier;
import org.apache.hadoop.conf.Configuration;
@@ -48,13 +45,8 @@ import org.apache.hadoop.conf.Configuration;
abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
private String jsonSchema;
- private transient Configuration conf;
+ protected transient Configuration conf;
private transient Schema schema;
- private BinaryEncoder binaryEncoder;
- private BinaryDecoder binaryDecoder;
-
- private transient DatumWriter<T> datumWriter;
- private transient DatumReader<T> datumReader;
public AvroDeepCopier(Schema schema) {
this.jsonSchema = schema.toString();
@@ -72,39 +64,19 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
this.conf = conf;
}
- protected abstract T createCopyTarget();
-
- protected abstract DatumWriter<T> createDatumWriter(Configuration conf);
-
- protected abstract DatumReader<T> createDatumReader(Configuration conf);
-
/**
* Deep copier for Avro specific data objects.
*/
public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
- private Class<T> valueClass;
-
- public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
+ public AvroSpecificDeepCopier(Schema schema) {
super(schema);
- this.valueClass = valueClass;
- }
-
- @Override
- protected T createCopyTarget() {
- return createNewInstance(valueClass);
- }
-
- @Override
- protected DatumWriter<T> createDatumWriter(Configuration conf) {
- return new SpecificDatumWriter<T>(valueClass);
}
@Override
- protected DatumReader<T> createDatumReader(Configuration conf) {
- return new SpecificDatumReader<T>(valueClass);
+ public T deepCopy(T source) {
+ return SpecificData.get().deepCopy(getSchema(), source);
}
-
}
/**
@@ -112,25 +84,13 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
*/
public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
- private transient Schema schema;
-
public AvroGenericDeepCopier(Schema schema) {
super(schema);
}
@Override
- protected Record createCopyTarget() {
- return new GenericData.Record(getSchema());
- }
-
- @Override
- protected DatumReader<Record> createDatumReader(Configuration conf) {
- return new GenericDatumReader<Record>(getSchema());
- }
-
- @Override
- protected DatumWriter<Record> createDatumWriter(Configuration conf) {
- return new GenericDatumWriter<Record>(getSchema());
+ public Record deepCopy(Record source) {
+ return GenericData.get().deepCopy(getSchema(), source);
}
}
@@ -139,72 +99,71 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
*/
public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
- private Class<T> valueClass;
+ private DatumReader<T> datumReader;
+ private DatumWriter<T> datumWriter;
+ private BinaryEncoder binaryEncoder;
+ private BinaryDecoder binaryDecoder;
+ private final Class<T> valueClass;
public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
super(schema);
this.valueClass = valueClass;
}
- @Override
- protected T createCopyTarget() {
- return createNewInstance(valueClass);
- }
-
- @Override
protected DatumReader<T> createDatumReader(Configuration conf) {
return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(getSchema());
}
- @Override
protected DatumWriter<T> createDatumWriter(Configuration conf) {
return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getWriter(getSchema());
}
- }
- /**
- * Create a deep copy of an Avro value.
- *
- * @param source The value to be copied
- * @return The deep copy of the value
- */
- @Override
- public T deepCopy(T source) {
-
- if (source == null) {
- return null;
- }
-
- if (datumReader == null) {
- datumReader = createDatumReader(conf);
- }
- if (datumWriter == null) {
- datumWriter = createDatumWriter(conf);
- }
- ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
- binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
- T target = createCopyTarget();
- try {
- datumWriter.write(source, binaryEncoder);
- binaryEncoder.flush();
- binaryDecoder = DecoderFactory.get()
- .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
- return datumReader.read(target, binaryDecoder);
- } catch (Exception e) {
- throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
+ /**
+ * Create a deep copy of an Avro value.
+ *
+ * @param source The value to be copied
+ * @return The deep copy of the value
+ */
+ @Override
+ public T deepCopy(T source) {
+
+ if (source == null) {
+ return null;
+ }
+
+ if (datumReader == null) {
+ datumReader = createDatumReader(conf);
+ }
+ if (datumWriter == null) {
+ datumWriter = createDatumWriter(conf);
+ }
+ ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+ binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
+ T target = createNewInstance(valueClass);
+ try {
+ datumWriter.write(source, binaryEncoder);
+ binaryEncoder.flush();
+ binaryDecoder = DecoderFactory.get()
+ .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+ return datumReader.read(target, binaryDecoder);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
+ }
}
- }
- protected T createNewInstance(Class<T> targetClass) {
- try {
- return targetClass.newInstance();
- } catch (InstantiationException e) {
- throw new CrunchRuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new CrunchRuntimeException(e);
+ protected T createNewInstance(Class<T> targetClass) {
+ try {
+ return targetClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new CrunchRuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new CrunchRuntimeException(e);
+ }
}
}
+
+
/**
* Copies ByteBuffers that are stored in Avro. A specific case is needed here
* because ByteBuffers are the one built-in case where the serialization type is different
http://git-wip-us.apache.org/repos/asf/crunch/blob/de346d01/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 62945b1..4b2c67b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -252,8 +252,8 @@ public class Avros {
}
public static final AvroType<GenericData.Record> generics(Schema schema) {
- return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(
- schema));
+ return new AvroType<GenericData.Record>(
+ GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(schema));
}
public static final <T> AvroType<T> containers(Class<T> clazz) {
@@ -266,7 +266,7 @@ public class Avros {
public static final <T extends SpecificRecord> AvroType<T> specifics(Class<T> clazz) {
T t = ReflectionUtils.newInstance(clazz, null);
Schema schema = t.getSchema();
- return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(clazz, schema));
+ return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(schema));
}
public static final <T> AvroType<T> reflects(Class<T> clazz) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/de346d01/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index 9d43f0c..795e2b4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -41,27 +41,35 @@ public class AvroDeepCopierTest {
person.age = 42;
person.siblingnames = Lists.<CharSequence> newArrayList();
- Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
- .deepCopy(person);
+ Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.SCHEMA$).deepCopy(person);
assertEquals(person, deepCopyPerson);
assertNotSame(person, deepCopyPerson);
}
@Test
+ public void testDeepCopySpecific_Null() {
+ assertNull(new AvroSpecificDeepCopier<Person>(Person.SCHEMA$).deepCopy(null));
+ }
+
+ @Test
public void testDeepCopyGeneric() {
Record record = new Record(Person.SCHEMA$);
record.put("name", "John Doe");
record.put("age", 42);
record.put("siblingnames", Lists.newArrayList());
- Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$)
- .deepCopy(record);
+ Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(record);
assertEquals(record, deepCopyRecord);
assertNotSame(record, deepCopyRecord);
}
+ @Test
+ public void testDeepCopyGeneric_Null() {
+ assertNull(new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(null));
+ }
+
static class ReflectedPerson {
String name;
int age;
@@ -94,15 +102,14 @@ public class AvroDeepCopierTest {
assertNotSame(person, deepCopyPerson);
}
-
- @Test
- public void testDeepCopy_Null() {
- Person person = null;
- Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
- .deepCopy(person);
+ @Test
+ public void testDeepCopyReflect_Null() {
+ AvroDeepCopier<ReflectedPerson> avroDeepCopier = new AvroDeepCopier.AvroReflectDeepCopier<ReflectedPerson>(
+ ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema());
+ avroDeepCopier.initialize(new Configuration());
- assertNull(deepCopyPerson);
+ assertNull(avroDeepCopier.deepCopy(null));
}
@Test
http://git-wip-us.apache.org/repos/asf/crunch/blob/de346d01/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java
index 11b0a78..26a8c6a 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java
@@ -37,7 +37,7 @@ public class AvroSpecificDeepCopierClassloaderTest {
person.age = 42;
person.siblingnames = Lists.newArrayList();
- Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+ Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.SCHEMA$)
.deepCopy(person);
assertEquals(person, deepCopyPerson);