You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/03/13 22:36:33 UTC
git commit: CRUNCH-363: Fixes protobufs to work with collection and
union wrappers
Repository: crunch
Updated Branches:
refs/heads/apache-crunch-0.8 82ce49f8d -> 39e450442
CRUNCH-363: Fixes protobufs to work with collection and union wrappers
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/39e45044
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/39e45044
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/39e45044
Branch: refs/heads/apache-crunch-0.8
Commit: 39e45044238804be58655aa3626b6008a3a15aa6
Parents: 82ce49f
Author: Josh Wills <jw...@apache.org>
Authored: Thu Mar 13 14:36:25 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Mar 13 14:36:25 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/crunch/lib/CogroupIT.java | 69 +++
.../org/apache/crunch/lib/PersonProtos.java | 540 +++++++++++++++++++
crunch-core/src/it/resources/person.proto | 28 +
.../apache/crunch/types/writable/Writables.java | 17 +-
4 files changed, 649 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/39e45044/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
index 191c737..9be5b1e 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -23,12 +23,15 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.crunch.*;
import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.PersonProtos.Person;
+import org.apache.crunch.lib.PersonProtos.Person.Builder;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.test.Tests;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.junit.After;
@@ -108,6 +111,26 @@ public class CogroupIT {
runCogroupN(AvroTypeFamily.getInstance());
}
+ @Test
+ public void testCogroupProtosWritables() {
+ runCogroupProtos(WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testCogroupProtosAvro() {
+ runCogroupProtos(AvroTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testCogroupProtosPairsWritables() {
+ runCogroupProtosPairs(WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testCogroupProtosPairsAvro() {
+ runCogroupProtosPairs(AvroTypeFamily.getInstance());
+ }
+
public void runCogroup(PTypeFamily ptf) {
PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
@@ -133,6 +156,32 @@ public class CogroupIT {
assertThat(actual, is(expected));
}
+ public void runCogroupProtos(PTypeFamily ptf) {
+ PTableType<String, Person> tt = ptf.tableOf(ptf.strings(), PTypes.protos(Person.class, ptf));
+
+ PTable<String, Person> kv1 = lines1.parallelDo("kv1", new GenerateProto(), tt);
+ PTable<String, Person> kv2 = lines2.parallelDo("kv2", new GenerateProto(), tt);
+
+ PTable<String, Pair<Collection<Person>, Collection<Person>>> cg = Cogroup.cogroup(kv1, kv2);
+
+ Map<String, Pair<Collection<Person>, Collection<Person>>> result = cg.materializeToMap();
+
+ assertThat(result.size(), is(4));
+ }
+
+ public void runCogroupProtosPairs(PTypeFamily ptf) {
+ PTableType<String, Pair<String, Person>> tt = ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), PTypes.protos(Person.class, ptf)));
+
+ PTable<String, Pair<String, Person>> kv1 = lines1.parallelDo("kv1", new GenerateProtoPairs(), tt);
+ PTable<String, Pair<String, Person>> kv2 = lines2.parallelDo("kv2", new GenerateProtoPairs(), tt);
+
+ PTable<String, Pair<Collection<Pair<String, Person>>, Collection<Pair<String, Person>>>> cg = Cogroup.cogroup(kv1, kv2);
+
+ Map<String, Pair<Collection<Pair<String, Person>>, Collection<Pair<String, Person>>>> result = cg.materializeToMap();
+
+ assertThat(result.size(), is(4));
+ }
+
public void runCogroup3(PTypeFamily ptf) {
PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
@@ -229,6 +278,26 @@ public class CogroupIT {
}
}
+ private static class GenerateProto extends DoFn<String, Pair<String, Person>> {
+ @Override
+ public void process(String input, Emitter<Pair<String, Person>> emitter) {
+ String[] fields = input.split(",");
+ String key = fields[0];
+ Builder b = Person.newBuilder().setFirst("first"+key).setLast("last"+key);
+ emitter.emit(Pair.of(fields[0], b.build()));
+ }
+ }
+
+ private static class GenerateProtoPairs extends DoFn<String, Pair<String, Pair<String, Person>>> {
+ @Override
+ public void process(String input, Emitter<Pair<String, Pair<String, Person>>> emitter) {
+ String[] fields = input.split(",");
+ String key = fields[0];
+ Builder b = Person.newBuilder().setFirst("first"+key).setLast("last"+key);
+ emitter.emit(Pair.of(fields[0], Pair.of(fields[1], b.build())));
+ }
+ }
+
private static Collection<String> coll(String... values) {
return ImmutableSet.copyOf(values);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/39e45044/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java b/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
new file mode 100644
index 0000000..c13376c
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: crunch-core/src/it/resources/person.proto
+
+package org.apache.crunch.lib;
+
+public final class PersonProtos {
+ private PersonProtos() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface PersonOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional string first = 1;
+ boolean hasFirst();
+ String getFirst();
+
+ // optional string last = 2;
+ boolean hasLast();
+ String getLast();
+ }
+ public static final class Person extends
+ com.google.protobuf.GeneratedMessage
+ implements PersonOrBuilder {
+ // Use Person.newBuilder() to construct.
+ private Person(Builder builder) {
+ super(builder);
+ }
+ private Person(boolean noInit) {}
+
+ private static final Person defaultInstance;
+ public static Person getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public Person getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // optional string first = 1;
+ public static final int FIRST_FIELD_NUMBER = 1;
+ private java.lang.Object first_;
+ public boolean hasFirst() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public String getFirst() {
+ java.lang.Object ref = first_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ first_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getFirstBytes() {
+ java.lang.Object ref = first_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ first_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional string last = 2;
+ public static final int LAST_FIELD_NUMBER = 2;
+ private java.lang.Object last_;
+ public boolean hasLast() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public String getLast() {
+ java.lang.Object ref = last_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ last_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getLastBytes() {
+ java.lang.Object ref = last_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ last_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ private void initFields() {
+ first_ = "";
+ last_ = "";
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getFirstBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getLastBytes());
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, getFirstBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getLastBytes());
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.crunch.lib.PersonProtos.Person prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.crunch.lib.PersonProtos.PersonOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.crunch.lib.PersonProtos.Person.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ first_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
+ last_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.crunch.lib.PersonProtos.Person.getDescriptor();
+ }
+
+ public org.apache.crunch.lib.PersonProtos.Person getDefaultInstanceForType() {
+ return org.apache.crunch.lib.PersonProtos.Person.getDefaultInstance();
+ }
+
+ public org.apache.crunch.lib.PersonProtos.Person build() {
+ org.apache.crunch.lib.PersonProtos.Person result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.crunch.lib.PersonProtos.Person buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.crunch.lib.PersonProtos.Person result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.crunch.lib.PersonProtos.Person buildPartial() {
+ org.apache.crunch.lib.PersonProtos.Person result = new org.apache.crunch.lib.PersonProtos.Person(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.first_ = first_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.last_ = last_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.crunch.lib.PersonProtos.Person) {
+ return mergeFrom((org.apache.crunch.lib.PersonProtos.Person)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.crunch.lib.PersonProtos.Person other) {
+ if (other == org.apache.crunch.lib.PersonProtos.Person.getDefaultInstance()) return this;
+ if (other.hasFirst()) {
+ setFirst(other.getFirst());
+ }
+ if (other.hasLast()) {
+ setLast(other.getLast());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ first_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ last_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // optional string first = 1;
+ private java.lang.Object first_ = "";
+ public boolean hasFirst() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public String getFirst() {
+ java.lang.Object ref = first_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ first_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setFirst(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ first_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearFirst() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ first_ = getDefaultInstance().getFirst();
+ onChanged();
+ return this;
+ }
+ void setFirst(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000001;
+ first_ = value;
+ onChanged();
+ }
+
+ // optional string last = 2;
+ private java.lang.Object last_ = "";
+ public boolean hasLast() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public String getLast() {
+ java.lang.Object ref = last_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ last_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setLast(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ last_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearLast() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ last_ = getDefaultInstance().getLast();
+ onChanged();
+ return this;
+ }
+ void setLast(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000002;
+ last_ = value;
+ onChanged();
+ }
+
+ // @@protoc_insertion_point(builder_scope:crunch.Person)
+ }
+
+ static {
+ defaultInstance = new Person(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:crunch.Person)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_crunch_Person_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_crunch_Person_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n)crunch-core/src/it/resources/person.pr" +
+ "oto\022\006crunch\"%\n\006Person\022\r\n\005first\030\001 \001(\t\022\014\n\004" +
+ "last\030\002 \001(\tB\'\n\025org.apache.crunch.libB\014Per" +
+ "sonProtosH\001"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_crunch_Person_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_crunch_Person_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_crunch_Person_descriptor,
+ new java.lang.String[] { "First", "Last", },
+ org.apache.crunch.lib.PersonProtos.Person.class,
+ org.apache.crunch.lib.PersonProtos.Person.Builder.class);
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/39e45044/crunch-core/src/it/resources/person.proto
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/person.proto b/crunch-core/src/it/resources/person.proto
new file mode 100644
index 0000000..b973234
--- /dev/null
+++ b/crunch-core/src/it/resources/person.proto
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package crunch;
+
+option java_package = "org.apache.crunch.lib";
+option java_outer_classname = "PersonProtos";
+
+option optimize_for = SPEED;
+
+message Person {
+ optional string first = 1;
+ optional string last = 2;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/39e45044/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index d087ca3..89464ac 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -359,6 +359,14 @@ public class Writables {
return new WritableTableType((WritableType) key, (WritableType) value);
}
+ private static BytesWritable asBytesWritable(Writable w) {
+ if (w instanceof BytesWritable) {
+ return (BytesWritable) w;
+ } else {
+ return new BytesWritable(WritableUtils.toByteArray(w));
+ }
+ }
+
private static <W extends Writable> W create(Class<W> clazz, Writable writable) {
if (clazz.equals(writable.getClass())) {
return (W) writable;
@@ -512,7 +520,7 @@ public class Writables {
values[i] = w;
written[i] = WRITABLE_CODES.inverse().get(w.getClass());
} else {
- values[i] = new BytesWritable(WritableUtils.toByteArray(w));
+ values[i] = asBytesWritable(w);
written[i] = 1; // code for BytesWritable
}
}
@@ -652,7 +660,7 @@ public class Writables {
public UnionWritable map(Union input) {
int index = input.getIndex();
Writable w = (Writable) fns.get(index).map(input.getValue());
- return new UnionWritable(index, new BytesWritable(WritableUtils.toByteArray(w)));
+ return new UnionWritable(index, asBytesWritable(w));
}
}
@@ -744,8 +752,7 @@ public class Writables {
BytesWritable[] w = new BytesWritable[input.size()];
int index = 0;
for (T in : input) {
- Writable v = (Writable) mapFn.map(in);
- w[index++] = new BytesWritable(WritableUtils.toByteArray(v));
+ w[index++] = asBytesWritable((Writable) mapFn.map(in));
}
arrayWritable.set(w);
return arrayWritable;
@@ -822,7 +829,7 @@ public class Writables {
TextMapWritable tmw = new TextMapWritable();
for (Map.Entry<String, T> e : input.entrySet()) {
Writable w = mapFn.map(e.getValue());
- tmw.put(new Text(e.getKey()), new BytesWritable(WritableUtils.toByteArray(w)));
+ tmw.put(new Text(e.getKey()), asBytesWritable(w));
}
return tmw;
}