You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/03/13 22:36:33 UTC

git commit: CRUNCH-363: Fixes protobufs to work with collection and union wrappers

Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 82ce49f8d -> 39e450442


CRUNCH-363: Fixes protobufs to work with collection and union wrappers


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/39e45044
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/39e45044
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/39e45044

Branch: refs/heads/apache-crunch-0.8
Commit: 39e45044238804be58655aa3626b6008a3a15aa6
Parents: 82ce49f
Author: Josh Wills <jw...@apache.org>
Authored: Thu Mar 13 14:36:25 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Mar 13 14:36:25 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/CogroupIT.java   |  69 +++
 .../org/apache/crunch/lib/PersonProtos.java     | 540 +++++++++++++++++++
 crunch-core/src/it/resources/person.proto       |  28 +
 .../apache/crunch/types/writable/Writables.java |  17 +-
 4 files changed, 649 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/39e45044/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
index 191c737..9be5b1e 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -23,12 +23,15 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import org.apache.crunch.*;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.PersonProtos.Person;
+import org.apache.crunch.lib.PersonProtos.Person.Builder;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.test.Tests;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.After;
@@ -108,6 +111,26 @@ public class CogroupIT {
     runCogroupN(AvroTypeFamily.getInstance());
   }
 
+  @Test
+  public void testCogroupProtosWritables() {
+      runCogroupProtos(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupProtosAvro() {
+      runCogroupProtos(AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupProtosPairsWritables() {
+    runCogroupProtosPairs(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupProtosPairsAvro() {
+    runCogroupProtosPairs(AvroTypeFamily.getInstance());
+  }
+
   public void runCogroup(PTypeFamily ptf) {
     PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
 
@@ -133,6 +156,32 @@ public class CogroupIT {
     assertThat(actual, is(expected));
   }
 
+  public void runCogroupProtos(PTypeFamily ptf) {
+    PTableType<String, Person> tt = ptf.tableOf(ptf.strings(), PTypes.protos(Person.class, ptf));
+
+    PTable<String, Person> kv1 = lines1.parallelDo("kv1", new GenerateProto(), tt);
+    PTable<String, Person> kv2 = lines2.parallelDo("kv2", new GenerateProto(), tt);
+
+    PTable<String, Pair<Collection<Person>, Collection<Person>>> cg = Cogroup.cogroup(kv1, kv2);
+
+    Map<String, Pair<Collection<Person>, Collection<Person>>> result = cg.materializeToMap();
+
+    assertThat(result.size(), is(4));
+  }
+
+  public void runCogroupProtosPairs(PTypeFamily ptf) {
+    PTableType<String, Pair<String, Person>> tt = ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), PTypes.protos(Person.class, ptf)));
+
+    PTable<String, Pair<String, Person>> kv1 = lines1.parallelDo("kv1", new GenerateProtoPairs(), tt);
+    PTable<String, Pair<String, Person>> kv2 = lines2.parallelDo("kv2", new GenerateProtoPairs(), tt);
+
+    PTable<String, Pair<Collection<Pair<String, Person>>, Collection<Pair<String, Person>>>> cg = Cogroup.cogroup(kv1, kv2);
+
+    Map<String, Pair<Collection<Pair<String, Person>>, Collection<Pair<String, Person>>>> result = cg.materializeToMap();
+
+    assertThat(result.size(), is(4));
+  }
+
   public void runCogroup3(PTypeFamily ptf) {
     PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
 
@@ -229,6 +278,26 @@ public class CogroupIT {
     }
   }
 
+  private static class GenerateProto extends DoFn<String, Pair<String, Person>> {
+      @Override
+      public void process(String input, Emitter<Pair<String, Person>> emitter) {
+          String[] fields = input.split(",");
+          String key = fields[0];
+          Builder b = Person.newBuilder().setFirst("first"+key).setLast("last"+key);
+          emitter.emit(Pair.of(fields[0], b.build()));
+      }
+  }
+
+  private static class GenerateProtoPairs extends DoFn<String, Pair<String, Pair<String, Person>>> {
+      @Override
+      public void process(String input, Emitter<Pair<String, Pair<String, Person>>> emitter) {
+          String[] fields = input.split(",");
+          String key = fields[0];
+          Builder b = Person.newBuilder().setFirst("first"+key).setLast("last"+key);
+          emitter.emit(Pair.of(fields[0], Pair.of(fields[1], b.build())));
+      }
+  }
+
   private static Collection<String> coll(String... values) {
     return ImmutableSet.copyOf(values);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/39e45044/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java b/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
new file mode 100644
index 0000000..c13376c
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: crunch-core/src/it/resources/person.proto
+
+package org.apache.crunch.lib;
+
+public final class PersonProtos {
+  private PersonProtos() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface PersonOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+    
+    // optional string first = 1;
+    boolean hasFirst();
+    String getFirst();
+    
+    // optional string last = 2;
+    boolean hasLast();
+    String getLast();
+  }
+  public static final class Person extends
+      com.google.protobuf.GeneratedMessage
+      implements PersonOrBuilder {
+    // Use Person.newBuilder() to construct.
+    private Person(Builder builder) {
+      super(builder);
+    }
+    private Person(boolean noInit) {}
+    
+    private static final Person defaultInstance;
+    public static Person getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public Person getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_fieldAccessorTable;
+    }
+    
+    private int bitField0_;
+    // optional string first = 1;
+    public static final int FIRST_FIELD_NUMBER = 1;
+    private java.lang.Object first_;
+    public boolean hasFirst() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getFirst() {
+      java.lang.Object ref = first_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          first_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getFirstBytes() {
+      java.lang.Object ref = first_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        first_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // optional string last = 2;
+    public static final int LAST_FIELD_NUMBER = 2;
+    private java.lang.Object last_;
+    public boolean hasLast() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public String getLast() {
+      java.lang.Object ref = last_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          last_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getLastBytes() {
+      java.lang.Object ref = last_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        last_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    private void initFields() {
+      first_ = "";
+      last_ = "";
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getFirstBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getLastBytes());
+      }
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getFirstBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getLastBytes());
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.crunch.lib.PersonProtos.Person prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.crunch.lib.PersonProtos.PersonOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.crunch.lib.PersonProtos.Person.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        first_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        last_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.crunch.lib.PersonProtos.Person.getDescriptor();
+      }
+      
+      public org.apache.crunch.lib.PersonProtos.Person getDefaultInstanceForType() {
+        return org.apache.crunch.lib.PersonProtos.Person.getDefaultInstance();
+      }
+      
+      public org.apache.crunch.lib.PersonProtos.Person build() {
+        org.apache.crunch.lib.PersonProtos.Person result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.crunch.lib.PersonProtos.Person buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.crunch.lib.PersonProtos.Person result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.crunch.lib.PersonProtos.Person buildPartial() {
+        org.apache.crunch.lib.PersonProtos.Person result = new org.apache.crunch.lib.PersonProtos.Person(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.first_ = first_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.last_ = last_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.crunch.lib.PersonProtos.Person) {
+          return mergeFrom((org.apache.crunch.lib.PersonProtos.Person)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.crunch.lib.PersonProtos.Person other) {
+        if (other == org.apache.crunch.lib.PersonProtos.Person.getDefaultInstance()) return this;
+        if (other.hasFirst()) {
+          setFirst(other.getFirst());
+        }
+        if (other.hasLast()) {
+          setLast(other.getLast());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              first_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              last_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // optional string first = 1;
+      private java.lang.Object first_ = "";
+      public boolean hasFirst() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getFirst() {
+        java.lang.Object ref = first_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          first_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setFirst(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        first_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearFirst() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        first_ = getDefaultInstance().getFirst();
+        onChanged();
+        return this;
+      }
+      void setFirst(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        first_ = value;
+        onChanged();
+      }
+      
+      // optional string last = 2;
+      private java.lang.Object last_ = "";
+      public boolean hasLast() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getLast() {
+        java.lang.Object ref = last_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          last_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setLast(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        last_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearLast() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        last_ = getDefaultInstance().getLast();
+        onChanged();
+        return this;
+      }
+      void setLast(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000002;
+        last_ = value;
+        onChanged();
+      }
+      
+      // @@protoc_insertion_point(builder_scope:crunch.Person)
+    }
+    
+    static {
+      defaultInstance = new Person(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:crunch.Person)
+  }
+  
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_crunch_Person_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_crunch_Person_fieldAccessorTable;
+  
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n)crunch-core/src/it/resources/person.pr" +
+      "oto\022\006crunch\"%\n\006Person\022\r\n\005first\030\001 \001(\t\022\014\n\004" +
+      "last\030\002 \001(\tB\'\n\025org.apache.crunch.libB\014Per" +
+      "sonProtosH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_crunch_Person_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_crunch_Person_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_crunch_Person_descriptor,
+              new java.lang.String[] { "First", "Last", },
+              org.apache.crunch.lib.PersonProtos.Person.class,
+              org.apache.crunch.lib.PersonProtos.Person.Builder.class);
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+        }, assigner);
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/39e45044/crunch-core/src/it/resources/person.proto
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/person.proto b/crunch-core/src/it/resources/person.proto
new file mode 100644
index 0000000..b973234
--- /dev/null
+++ b/crunch-core/src/it/resources/person.proto
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package crunch;
+
+option java_package = "org.apache.crunch.lib";
+option java_outer_classname = "PersonProtos";
+
+option optimize_for = SPEED;
+
+message Person {
+   optional string first = 1;
+   optional string last = 2;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/39e45044/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index d087ca3..89464ac 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -359,6 +359,14 @@ public class Writables {
     return new WritableTableType((WritableType) key, (WritableType) value);
   }
 
+  private static BytesWritable asBytesWritable(Writable w) {
+    if (w instanceof BytesWritable) {
+      return (BytesWritable) w;
+    } else {
+      return new BytesWritable(WritableUtils.toByteArray(w));
+    }
+  }
+
   private static <W extends Writable> W create(Class<W> clazz, Writable writable) {
     if (clazz.equals(writable.getClass())) {
       return (W) writable;
@@ -512,7 +520,7 @@ public class Writables {
             values[i] = w;
             written[i] = WRITABLE_CODES.inverse().get(w.getClass());
           } else {
-            values[i] = new BytesWritable(WritableUtils.toByteArray(w));
+            values[i] = asBytesWritable(w);
             written[i] = 1; // code for BytesWritable
           }
         }
@@ -652,7 +660,7 @@ public class Writables {
     public UnionWritable map(Union input) {
       int index = input.getIndex();
       Writable w = (Writable) fns.get(index).map(input.getValue());
-      return new UnionWritable(index, new BytesWritable(WritableUtils.toByteArray(w)));
+      return new UnionWritable(index, asBytesWritable(w));
     }
   }
 
@@ -744,8 +752,7 @@ public class Writables {
       BytesWritable[] w = new BytesWritable[input.size()];
       int index = 0;
       for (T in : input) {
-        Writable v = (Writable) mapFn.map(in);
-        w[index++] = new BytesWritable(WritableUtils.toByteArray(v));
+        w[index++] = asBytesWritable((Writable) mapFn.map(in));
       }
       arrayWritable.set(w);
       return arrayWritable;
@@ -822,7 +829,7 @@ public class Writables {
       TextMapWritable tmw = new TextMapWritable();
       for (Map.Entry<String, T> e : input.entrySet()) {
         Writable w = mapFn.map(e.getValue());
-        tmw.put(new Text(e.getKey()), new BytesWritable(WritableUtils.toByteArray(w)));
+        tmw.put(new Text(e.getKey()), asBytesWritable(w));
       }
       return tmw;
     }