You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/22 20:22:37 UTC
git commit: CRUNCH-328: Add support for ExtensionRegistry in
PTypes.protos with a SerializableSupplier
Updated Branches:
refs/heads/master 809a6d701 -> 349e89612
CRUNCH-328: Add support for ExtensionRegistry in PTypes.protos with a SerializableSupplier
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/349e8961
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/349e8961
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/349e8961
Branch: refs/heads/master
Commit: 349e896124422b8b76ef591400f8f50aaffe6731
Parents: 809a6d7
Author: Josh Wills <jw...@apache.org>
Authored: Tue Jan 21 18:36:03 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Jan 22 11:22:19 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/crunch/types/PTypes.java | 50 +++++++++++++++++++-
.../crunch/util/SerializableSupplier.java | 31 ++++++++++++
2 files changed, 79 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/349e8961/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
index cbb9c7c..e701747 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
@@ -21,8 +21,10 @@ import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.UUID;
+import com.google.protobuf.ExtensionRegistry;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
+import org.apache.crunch.util.SerializableSupplier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
@@ -41,27 +43,59 @@ import com.google.protobuf.Message;
*/
public class PTypes {
+ /**
+ * A PType for Java's {@link BigInteger} type.
+ */
public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
}
+ /**
+ * A PType for Java's {@link UUID} type.
+ */
public static PType<UUID> uuid(PTypeFamily ptf) {
return ptf.derived(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes());
}
-
+
+ /**
+ * Constructs a PType for reading a Java type from a JSON string using Jackson's {@link ObjectMapper}.
+ */
public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily
.derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings());
}
+ /**
+ * Constructs a PType for the given protocol buffer.
+ */
public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes());
}
+ /**
+ * Constructs a PType for a protocol buffer, using the given {@code SerializableSupplier} to provide
+ * an {@link ExtensionRegistry} to use in reading the given protobuf.
+ */
+ public static <T extends Message> PType<T> protos(
+ Class<T> clazz,
+ PTypeFamily typeFamily,
+ SerializableSupplier<ExtensionRegistry> supplier) {
+ return typeFamily.derived(clazz,
+ new ProtoInputMapFn<T>(clazz, supplier),
+ new ProtoOutputMapFn<T>(),
+ typeFamily.bytes());
+ }
+
+ /**
+ * Constructs a PType for a Thrift record.
+ */
public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes());
}
+ /**
+ * Constructs a PType for a Java {@code Enum} type.
+ */
public static <T extends Enum> PType<T> enums(Class<T> type, PTypeFamily typeFamily) {
return typeFamily.derived(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(), typeFamily.strings());
}
@@ -126,21 +160,33 @@ public class PTypes {
private static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
private final Class<T> clazz;
+ private final SerializableSupplier<ExtensionRegistry> extensionSupplier;
private transient T instance;
+ private transient ExtensionRegistry registry;
ProtoInputMapFn(Class<T> clazz) {
+ this(clazz, null);
+ }
+
+ ProtoInputMapFn(Class<T> clazz, SerializableSupplier<ExtensionRegistry> extensionSupplier) {
this.clazz = clazz;
+ this.extensionSupplier = extensionSupplier;
}
@Override
public void initialize() {
this.instance = Protos.getDefaultInstance(clazz);
+ if (this.extensionSupplier != null) {
+ this.registry = extensionSupplier.get();
+ } else {
+ this.registry = ExtensionRegistry.getEmptyRegistry();
+ }
}
@Override
public T map(ByteBuffer bb) {
try {
- return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build();
+ return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit(), registry).build();
} catch (InvalidProtocolBufferException e) {
throw new CrunchRuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/349e8961/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java b/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java
new file mode 100644
index 0000000..3642feb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import com.google.common.base.Supplier;
+
+import java.io.Serializable;
+
+/**
+ * An extension of Guava's {@link Supplier} interface that indicates that an instance
+ * will also implement {@link Serializable}, which makes this object suitable for use
+ * with Crunch's DoFns when we need to construct an instance of a non-serializable
+ * type for use in processing.
+ */
+public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+}