You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/22 20:22:37 UTC

git commit: CRUNCH-328: Add support for ExtensionRegistry in PTypes.protos with a SerializableSupplier

Updated Branches:
  refs/heads/master 809a6d701 -> 349e89612


CRUNCH-328: Add support for ExtensionRegistry in PTypes.protos with a SerializableSupplier


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/349e8961
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/349e8961
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/349e8961

Branch: refs/heads/master
Commit: 349e896124422b8b76ef591400f8f50aaffe6731
Parents: 809a6d7
Author: Josh Wills <jw...@apache.org>
Authored: Tue Jan 21 18:36:03 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Jan 22 11:22:19 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/types/PTypes.java    | 50 +++++++++++++++++++-
 .../crunch/util/SerializableSupplier.java       | 31 ++++++++++++
 2 files changed, 79 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/349e8961/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
index cbb9c7c..e701747 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
@@ -21,8 +21,10 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
+import com.google.protobuf.ExtensionRegistry;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.MapFn;
+import org.apache.crunch.util.SerializableSupplier;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
@@ -41,27 +43,59 @@ import com.google.protobuf.Message;
  */
 public class PTypes {
 
+  /**
+   * A PType for Java's {@link BigInteger} type.
+   */
   public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
     return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
   }
 
+  /**
+   * A PType for Java's {@link UUID} type.
+   */
   public static PType<UUID> uuid(PTypeFamily ptf) {
     return ptf.derived(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes());
   }
-  
+
+  /**
+   * Constructs a PType for reading a Java type from a JSON string using Jackson's {@link ObjectMapper}.
+   */
   public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
     return typeFamily
         .derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings());
   }
 
+  /**
+   * Constructs a PType for the given protocol buffer.
+   */
   public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
     return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes());
   }
 
+  /**
+   * Constructs a PType for a protocol buffer, using the given {@code SerializableSupplier} to provide
+   * an {@link ExtensionRegistry} to use in reading the given protobuf.
+   */
+  public static <T extends Message> PType<T> protos(
+      Class<T> clazz,
+      PTypeFamily typeFamily,
+      SerializableSupplier<ExtensionRegistry> supplier) {
+    return typeFamily.derived(clazz,
+        new ProtoInputMapFn<T>(clazz, supplier),
+        new ProtoOutputMapFn<T>(),
+        typeFamily.bytes());
+  }
+
+  /**
+   * Constructs a PType for a Thrift record.
+   */
   public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
     return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes());
   }
 
+  /**
+   * Constructs a PType for a Java {@code Enum} type.
+   */
   public static <T extends Enum> PType<T> enums(Class<T> type, PTypeFamily typeFamily) {
     return typeFamily.derived(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(), typeFamily.strings());
   }
@@ -126,21 +160,33 @@ public class PTypes {
   private static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
 
     private final Class<T> clazz;
+    private final SerializableSupplier<ExtensionRegistry> extensionSupplier;
     private transient T instance;
+    private transient ExtensionRegistry registry;
 
     ProtoInputMapFn(Class<T> clazz) {
+      this(clazz, null);
+    }
+
+    ProtoInputMapFn(Class<T> clazz, SerializableSupplier<ExtensionRegistry> extensionSupplier) {
       this.clazz = clazz;
+      this.extensionSupplier = extensionSupplier;
     }
 
     @Override
     public void initialize() {
       this.instance = Protos.getDefaultInstance(clazz);
+      if (this.extensionSupplier != null) {
+        this.registry = extensionSupplier.get();
+      } else {
+        this.registry = ExtensionRegistry.getEmptyRegistry();
+      }
     }
 
     @Override
     public T map(ByteBuffer bb) {
       try {
-        return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build();
+        return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit(), registry).build();
       } catch (InvalidProtocolBufferException e) {
         throw new CrunchRuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/349e8961/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java b/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java
new file mode 100644
index 0000000..3642feb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/SerializableSupplier.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import com.google.common.base.Supplier;
+
+import java.io.Serializable;
+
+/**
+ * An extension of Guava's {@link Supplier} interface that indicates that an instance
+ * will also implement {@link Serializable}, which makes this object suitable for use
+ * with Crunch's DoFns when we need to construct an instance of a non-serializable
+ * type for use in processing.
+ */
+public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+}