You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/04/25 22:52:54 UTC
git commit: CRUNCH-382 Add DeepCopier for Avro ByteBuffers
Repository: crunch
Updated Branches:
refs/heads/master 26f6bb2d3 -> f81b5c253
CRUNCH-382 Add DeepCopier for Avro ByteBuffers
Replace the NoOpDeepCopier for Avro ByteBuffers. ByteBuffers are
not immutable, and are reused within Avro, so places where
PType#getDetachedValue on Avro byte[] values will now work as
expected.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f81b5c25
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f81b5c25
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f81b5c25
Branch: refs/heads/master
Commit: f81b5c253fb6380e5cd68d320a9a74f0b2a2c5d0
Parents: 26f6bb2
Author: Gabriel Reid <gr...@apache.org>
Authored: Fri Apr 25 21:56:31 2014 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Fri Apr 25 22:36:51 2014 +0200
----------------------------------------------------------------------
.../crunch/types/avro/AvroDeepCopier.java | 23 ++++++++++++++++++++
.../org/apache/crunch/types/avro/Avros.java | 2 +-
.../crunch/types/avro/AvroDeepCopierTest.java | 16 ++++++++++++--
3 files changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 1eca1b8..56ec459 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -19,6 +19,7 @@ package org.apache.crunch.types.avro;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -205,4 +206,26 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
}
}
+ /**
+ * Copies ByteBuffers that are stored in Avro. A specific case is needed here
+ * because ByteBuffers are the one built-in case where the serialization type is different
+ * than the output type and the output type isn't immutable.
+ */
+ public static class AvroByteBufferDeepCopier implements DeepCopier<ByteBuffer> {
+
+ public static final AvroByteBufferDeepCopier INSTANCE = new AvroByteBufferDeepCopier();
+
+ @Override
+ public void initialize(Configuration conf) {
+ // No-op
+ }
+
+ @Override
+ public ByteBuffer deepCopy(ByteBuffer source) {
+ byte[] copy = new byte[source.limit()];
+ System.arraycopy(source.array(), 0, copy, 0, source.limit());
+ return ByteBuffer.wrap(copy);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 266cb12..1fcb30e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -188,7 +188,7 @@ public class Avros {
private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(),
- NoOpDeepCopier.<ByteBuffer>create(), AvroType.AvroRecordType.GENERIC);
+ AvroDeepCopier.AvroByteBufferDeepCopier.INSTANCE, AvroType.AvroRecordType.GENERIC);
private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
.put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index 37c13c0..9d43f0c 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -17,20 +17,21 @@
*/
package org.apache.crunch.types.avro;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
+import java.nio.ByteBuffer;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.avro.generic.GenericData.Record;
import org.apache.crunch.test.Person;
import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
-import com.google.common.collect.Lists;
-
public class AvroDeepCopierTest {
@Test
@@ -104,4 +105,15 @@ public class AvroDeepCopierTest {
assertNull(deepCopyPerson);
}
+ @Test
+ public void testDeepCopy_ByteBuffer() {
+ byte[] bytes = new byte[] { 1, 2, 3 };
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ ByteBuffer deepCopied = new AvroDeepCopier.AvroByteBufferDeepCopier().INSTANCE.deepCopy(buffer);
+
+ // Change the original array to make sure we've really got a copy
+ bytes[0] = 0;
+ assertArrayEquals(new byte[] { 1, 2, 3 }, deepCopied.array());
+
+ }
}