You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/06/09 22:08:21 UTC

git commit: CRUNCH-416 Use built-in Avro deep copying

Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 83b0dba7c -> de346d01d


CRUNCH-416 Use built-in Avro deep copying

Replace serialization-based deep copying for Avro specific and
generic records.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/de346d01
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/de346d01
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/de346d01

Branch: refs/heads/apache-crunch-0.8
Commit: de346d01dc378b11161fcfd52c4e168c5832f4d4
Parents: 83b0dba
Author: Gabriel Reid <gr...@apache.org>
Authored: Sun Jun 8 22:19:20 2014 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Mon Jun 9 22:06:28 2014 +0200

----------------------------------------------------------------------
 .../crunch/types/avro/AvroDeepCopier.java       | 147 +++++++------------
 .../org/apache/crunch/types/avro/Avros.java     |   6 +-
 .../crunch/types/avro/AvroDeepCopierTest.java   |  29 ++--
 .../AvroSpecificDeepCopierClassloaderTest.java  |   2 +-
 4 files changed, 75 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/de346d01/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 855aa79..dcc4a19 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -24,16 +24,13 @@ import java.nio.ByteBuffer;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificData;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.types.DeepCopier;
 import org.apache.hadoop.conf.Configuration;
@@ -48,13 +45,8 @@ import org.apache.hadoop.conf.Configuration;
 abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
 
   private String jsonSchema;
-  private transient Configuration conf;
+  protected transient Configuration conf;
   private transient Schema schema;
-  private BinaryEncoder binaryEncoder;
-  private BinaryDecoder binaryDecoder;
-
-  private transient DatumWriter<T> datumWriter;
-  private transient DatumReader<T> datumReader;
 
   public AvroDeepCopier(Schema schema) {
     this.jsonSchema = schema.toString();
@@ -72,39 +64,19 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     this.conf = conf;
   }
 
-  protected abstract T createCopyTarget();
-
-  protected abstract DatumWriter<T> createDatumWriter(Configuration conf);
-
-  protected abstract DatumReader<T> createDatumReader(Configuration conf);
-
   /**
    * Deep copier for Avro specific data objects.
    */
   public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
 
-    private Class<T> valueClass;
-
-    public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
+    public AvroSpecificDeepCopier(Schema schema) {
       super(schema);
-      this.valueClass = valueClass;
-    }
-
-    @Override
-    protected T createCopyTarget() {
-      return createNewInstance(valueClass);
-    }
-
-    @Override
-    protected DatumWriter<T> createDatumWriter(Configuration conf) {
-      return new SpecificDatumWriter<T>(valueClass);
     }
 
     @Override
-    protected DatumReader<T> createDatumReader(Configuration conf) {
-      return new SpecificDatumReader<T>(valueClass);
+    public T deepCopy(T source) {
+      return SpecificData.get().deepCopy(getSchema(), source);
     }
-
   }
 
   /**
@@ -112,25 +84,13 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
    */
   public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
 
-    private transient Schema schema;
-
     public AvroGenericDeepCopier(Schema schema) {
       super(schema);
     }
 
     @Override
-    protected Record createCopyTarget() {
-      return new GenericData.Record(getSchema());
-    }
-
-    @Override
-    protected DatumReader<Record> createDatumReader(Configuration conf) {
-      return new GenericDatumReader<Record>(getSchema());
-    }
-
-    @Override
-    protected DatumWriter<Record> createDatumWriter(Configuration conf) {
-      return new GenericDatumWriter<Record>(getSchema());
+    public Record deepCopy(Record source) {
+      return GenericData.get().deepCopy(getSchema(), source);
     }
   }
 
@@ -139,72 +99,71 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
    */
   public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
 
-    private Class<T> valueClass;
+    private DatumReader<T> datumReader;
+    private DatumWriter<T> datumWriter;
+    private BinaryEncoder binaryEncoder;
+    private BinaryDecoder binaryDecoder;
+    private final Class<T> valueClass;
 
     public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
       super(schema);
       this.valueClass = valueClass;
     }
 
-    @Override
-    protected T createCopyTarget() {
-      return createNewInstance(valueClass);
-    }
-
-    @Override
     protected DatumReader<T> createDatumReader(Configuration conf) {
       return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(getSchema());
     }
 
-    @Override
     protected DatumWriter<T> createDatumWriter(Configuration conf) {
       return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getWriter(getSchema());
     }
-  }
 
-  /**
-   * Create a deep copy of an Avro value.
-   * 
-   * @param source The value to be copied
-   * @return The deep copy of the value
-   */
-  @Override
-  public T deepCopy(T source) {
-    
-    if (source == null) {
-      return null;
-    }
-    
-    if (datumReader == null) {
-      datumReader = createDatumReader(conf);
-    }
-    if (datumWriter == null) {
-      datumWriter = createDatumWriter(conf);
-    }
-    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
-    binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
-    T target = createCopyTarget();
-    try {
-      datumWriter.write(source, binaryEncoder);
-      binaryEncoder.flush();
-      binaryDecoder = DecoderFactory.get()
-          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
-      return datumReader.read(target, binaryDecoder);
-    } catch (Exception e) {
-      throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
+    /**
+     * Create a deep copy of an Avro value.
+     *
+     * @param source The value to be copied
+     * @return The deep copy of the value
+     */
+    @Override
+    public T deepCopy(T source) {
+
+      if (source == null) {
+        return null;
+      }
+
+      if (datumReader == null) {
+        datumReader = createDatumReader(conf);
+      }
+      if (datumWriter == null) {
+        datumWriter = createDatumWriter(conf);
+      }
+      ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+      binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
+      T target = createNewInstance(valueClass);
+      try {
+        datumWriter.write(source, binaryEncoder);
+        binaryEncoder.flush();
+        binaryDecoder = DecoderFactory.get()
+            .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+        return datumReader.read(target, binaryDecoder);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
+      }
     }
-  }
 
-  protected T createNewInstance(Class<T> targetClass) {
-    try {
-      return targetClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new CrunchRuntimeException(e);
-    } catch (IllegalAccessException e) {
-      throw new CrunchRuntimeException(e);
+    protected T createNewInstance(Class<T> targetClass) {
+      try {
+        return targetClass.newInstance();
+      } catch (InstantiationException e) {
+        throw new CrunchRuntimeException(e);
+      } catch (IllegalAccessException e) {
+        throw new CrunchRuntimeException(e);
+      }
     }
   }
 
+
+
   /**
    * Copies ByteBuffers that are stored in Avro. A specific case is needed here
    * because ByteBuffers are the one built-in case where the serialization type is different

http://git-wip-us.apache.org/repos/asf/crunch/blob/de346d01/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 62945b1..4b2c67b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -252,8 +252,8 @@ public class Avros {
   }
 
   public static final AvroType<GenericData.Record> generics(Schema schema) {
-    return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(
-        schema));
+    return new AvroType<GenericData.Record>(
+        GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(schema));
   }
 
   public static final <T> AvroType<T> containers(Class<T> clazz) {
@@ -266,7 +266,7 @@ public class Avros {
   public static final <T extends SpecificRecord> AvroType<T> specifics(Class<T> clazz) {
     T t = ReflectionUtils.newInstance(clazz, null);
     Schema schema = t.getSchema();
-    return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(clazz, schema));
+    return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(schema));
   }
 
   public static final <T> AvroType<T> reflects(Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/de346d01/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index 9d43f0c..795e2b4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -41,27 +41,35 @@ public class AvroDeepCopierTest {
     person.age = 42;
     person.siblingnames = Lists.<CharSequence> newArrayList();
 
-    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
-        .deepCopy(person);
+    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.SCHEMA$).deepCopy(person);
 
     assertEquals(person, deepCopyPerson);
     assertNotSame(person, deepCopyPerson);
   }
 
   @Test
+  public void testDeepCopySpecific_Null() {
+    assertNull(new AvroSpecificDeepCopier<Person>(Person.SCHEMA$).deepCopy(null));
+  }
+
+  @Test
   public void testDeepCopyGeneric() {
     Record record = new Record(Person.SCHEMA$);
     record.put("name", "John Doe");
     record.put("age", 42);
     record.put("siblingnames", Lists.newArrayList());
 
-    Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$)
-        .deepCopy(record);
+    Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(record);
 
     assertEquals(record, deepCopyRecord);
     assertNotSame(record, deepCopyRecord);
   }
 
+  @Test
+  public void testDeepCopyGeneric_Null() {
+    assertNull(new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(null));
+  }
+
   static class ReflectedPerson {
     String name;
     int age;
@@ -94,15 +102,14 @@ public class AvroDeepCopierTest {
     assertNotSame(person, deepCopyPerson);
 
   }
-  
-  @Test
-  public void testDeepCopy_Null() {
-    Person person = null;
 
-    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
-        .deepCopy(person);
+  @Test
+  public void testDeepCopyReflect_Null() {
+    AvroDeepCopier<ReflectedPerson> avroDeepCopier = new AvroDeepCopier.AvroReflectDeepCopier<ReflectedPerson>(
+        ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema());
+    avroDeepCopier.initialize(new Configuration());
 
-    assertNull(deepCopyPerson);
+    assertNull(avroDeepCopier.deepCopy(null));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/crunch/blob/de346d01/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java
index 11b0a78..26a8c6a 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java
@@ -37,7 +37,7 @@ public class AvroSpecificDeepCopierClassloaderTest {
     person.age = 42;
     person.siblingnames = Lists.newArrayList();
 
-    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.SCHEMA$)
         .deepCopy(person);
 
     assertEquals(person, deepCopyPerson);