You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/04/25 22:52:54 UTC

git commit: CRUNCH-382 Add DeepCopier for Avro ByteBuffers

Repository: crunch
Updated Branches:
  refs/heads/master 26f6bb2d3 -> f81b5c253


CRUNCH-382 Add DeepCopier for Avro ByteBuffers

Replace the NoOpDeepCopier for Avro ByteBuffers. ByteBuffers are
not immutable, and are reused within Avro, so places where
PType#getDetachedValue on Avro byte[] values will now work as
expected.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f81b5c25
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f81b5c25
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f81b5c25

Branch: refs/heads/master
Commit: f81b5c253fb6380e5cd68d320a9a74f0b2a2c5d0
Parents: 26f6bb2
Author: Gabriel Reid <gr...@apache.org>
Authored: Fri Apr 25 21:56:31 2014 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Fri Apr 25 22:36:51 2014 +0200

----------------------------------------------------------------------
 .../crunch/types/avro/AvroDeepCopier.java       | 23 ++++++++++++++++++++
 .../org/apache/crunch/types/avro/Avros.java     |  2 +-
 .../crunch/types/avro/AvroDeepCopierTest.java   | 16 ++++++++++++--
 3 files changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 1eca1b8..56ec459 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -19,6 +19,7 @@ package org.apache.crunch.types.avro;
 
 import java.io.ByteArrayOutputStream;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -205,4 +206,26 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     }
   }
 
+  /**
+   * Copies ByteBuffers that are stored in Avro. A specific case is needed here
+   * because ByteBuffers are the one built-in case where the serialization type is different
+   * than the output type and the output type isn't immutable.
+   */
+  public static class AvroByteBufferDeepCopier implements DeepCopier<ByteBuffer> {
+
+    public static final AvroByteBufferDeepCopier INSTANCE = new AvroByteBufferDeepCopier();
+
+    @Override
+    public void initialize(Configuration conf) {
+      // No-op
+    }
+
+    @Override
+    public ByteBuffer deepCopy(ByteBuffer source) {
+      byte[] copy = new byte[source.limit()];
+      System.arraycopy(source.array(), 0, copy, 0, source.limit());
+      return ByteBuffer.wrap(copy);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 266cb12..1fcb30e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -188,7 +188,7 @@ public class Avros {
   private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
   private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
       Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(),
-      NoOpDeepCopier.<ByteBuffer>create(), AvroType.AvroRecordType.GENERIC);
+      AvroDeepCopier.AvroByteBufferDeepCopier.INSTANCE, AvroType.AvroRecordType.GENERIC);
 
   private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
       .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)

http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index 37c13c0..9d43f0c 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -17,20 +17,21 @@
  */
 package org.apache.crunch.types.avro;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-
 public class AvroDeepCopierTest {
   
   @Test
@@ -104,4 +105,15 @@ public class AvroDeepCopierTest {
     assertNull(deepCopyPerson);
   }
 
+  @Test
+  public void testDeepCopy_ByteBuffer() {
+    byte[] bytes = new byte[] { 1, 2, 3 };
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    ByteBuffer deepCopied = new AvroDeepCopier.AvroByteBufferDeepCopier().INSTANCE.deepCopy(buffer);
+
+    // Change the original array to make sure we've really got a copy
+    bytes[0] = 0;
+    assertArrayEquals(new byte[] { 1, 2, 3 }, deepCopied.array());
+
+  }
 }