You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2011/06/16 00:21:31 UTC
svn commit: r1136222 - in /hadoop/common/trunk/common: ./
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/util/
src/test/core/org/apache/hadoop/io/ src/test/core/org/apache/hadoop/ipc/
src/test/core/org/apache/hadoop/util/
Author: todd
Date: Wed Jun 15 22:21:31 2011
New Revision: 1136222
URL: http://svn.apache.org/viewvc?rev=1136222&view=rev
Log:
HADOOP-7379. Add the ability to serialize and deserialize protocol buffers in ObjectWritable. Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java
hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java
Modified:
hadoop/common/trunk/common/CHANGES.txt
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/trunk/common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1136222&r1=1136221&r2=1136222&view=diff
==============================================================================
--- hadoop/common/trunk/common/CHANGES.txt (original)
+++ hadoop/common/trunk/common/CHANGES.txt Wed Jun 15 22:21:31 2011
@@ -44,6 +44,9 @@ Trunk (unreleased changes)
HADOOP-7144. Expose JMX metrics via JSON servlet. (Robert Joseph Evans via
cdouglas)
+ HADOOP-7379. Add the ability to serialize and deserialize protocol buffers
+ in ObjectWritable. (todd)
+
IMPROVEMENTS
HADOOP-7042. Updates to test-patch.sh to include failed test names and
Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java?rev=1136222&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java (added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java Wed Jun 15 22:21:31 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * OutputStream implementation that wraps a DataOutput.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class DataOutputOutputStream extends OutputStream {
+
+ private final DataOutput out;
+
+ /**
+ * Construct an OutputStream from the given DataOutput. If 'out'
+ * is already an OutputStream, simply returns it. Otherwise, wraps
+ * it in an OutputStream.
+ * @param out the DataOutput to wrap
+ * @return an OutputStream instance that outputs to 'out'
+ */
+ public static OutputStream constructOutputStream(DataOutput out) {
+ if (out instanceof OutputStream) {
+ return (OutputStream)out;
+ } else {
+ return new DataOutputOutputStream(out);
+ }
+ }
+
+ private DataOutputOutputStream(DataOutput out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.writeByte(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ }
+
+
+}
Modified: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java?rev=1136222&r1=1136221&r2=1136222&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java (original)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java Wed Jun 15 22:21:31 2011
@@ -19,6 +19,8 @@
package org.apache.hadoop.io;
import java.lang.reflect.Array;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.io.*;
import java.util.*;
@@ -26,6 +28,9 @@ import java.util.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.ProtoUtil;
+
+import com.google.protobuf.Message;
/** A polymorphic Writable that writes an instance with it's class name.
* Handles arrays, strings and primitive types without a Writable wrapper.
@@ -191,6 +196,9 @@ public class ObjectWritable implements W
UTF8.writeString(out, instance.getClass().getName());
((Writable)instance).write(out);
+ } else if (Message.class.isAssignableFrom(declaredClass)) {
+ ((Message)instance).writeDelimitedTo(
+ DataOutputOutputStream.constructOutputStream(out));
} else {
throw new IOException("Can't write: "+instance+" as "+declaredClass);
}
@@ -261,6 +269,8 @@ public class ObjectWritable implements W
instance = UTF8.readString(in);
} else if (declaredClass.isEnum()) { // enum
instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
+ } else if (Message.class.isAssignableFrom(declaredClass)) {
+ instance = tryInstantiateProtobuf(declaredClass, in);
} else { // Writable
Class instanceClass = null;
String str = UTF8.readString(in);
@@ -286,6 +296,67 @@ public class ObjectWritable implements W
}
/**
+ * Try to instantiate a protocol buffer of the given message class
+ * from the given input stream.
+ *
+ * @param protoClass the class of the generated protocol buffer
+ * @param dataIn the input stream to read from
+ * @return the instantiated Message instance
+ * @throws IOException if an IO problem occurs
+ */
+ private static Message tryInstantiateProtobuf(
+ Class<?> protoClass,
+ DataInput dataIn) throws IOException {
+
+ try {
+ if (dataIn instanceof InputStream) {
+ // We can use the built-in parseDelimitedFrom and not have to re-copy
+ // the data
+ Method parseMethod = getStaticProtobufMethod(protoClass,
+ "parseDelimitedFrom", InputStream.class);
+ return (Message)parseMethod.invoke(null, (InputStream)dataIn);
+ } else {
+ // Have to read it into a buffer first, since protobuf doesn't deal
+ // with the DataInput interface directly.
+
+ // Read the size delimiter that writeDelimitedTo writes
+ int size = ProtoUtil.readRawVarint32(dataIn);
+ if (size < 0) {
+ throw new IOException("Invalid size: " + size);
+ }
+
+ byte[] data = new byte[size];
+ dataIn.readFully(data);
+ Method parseMethod = getStaticProtobufMethod(protoClass,
+ "parseFrom", byte[].class);
+ return (Message)parseMethod.invoke(null, data);
+ }
+ } catch (InvocationTargetException e) {
+
+ if (e.getCause() instanceof IOException) {
+ throw (IOException)e.getCause();
+ } else {
+ throw new IOException(e.getCause());
+ }
+ } catch (IllegalAccessException iae) {
+ throw new AssertionError("Could not access parse method in " +
+ protoClass);
+ }
+ }
+
+ static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
+ Class<?> ... args) {
+
+ try {
+ return declaredClass.getMethod(method, args);
+ } catch (Exception e) {
+ // This is a bug in Hadoop - protobufs should all have this static method
+ throw new AssertionError("Protocol buffer class " + declaredClass +
+ " does not have an accessible parseFrom(InputStream) method!");
+ }
+ }
+
+ /**
* Find and load the class with given name <tt>className</tt> by first finding
* it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
* try load it directly.
Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java?rev=1136222&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java (added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java Wed Jun 15 22:21:31 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+public abstract class ProtoUtil {
+
+ /**
+ * Read a variable length integer in the same format that ProtoBufs encodes.
+ * @param in the input stream to read from
+ * @return the integer
+ * @throws IOException if it is malformed or EOF.
+ */
+ public static int readRawVarint32(DataInput in) throws IOException {
+ byte tmp = in.readByte();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = in.readByte()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = in.readByte()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = in.readByte()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = in.readByte()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (in.readByte() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Malformed varint");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+}
Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java?rev=1136222&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java (added)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java Wed Jun 15 22:21:31 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Message;
+
+/**
+ * Test case for the use of Protocol Buffers within ObjectWritable.
+ */
+public class TestObjectWritableProtos {
+
+ @Test
+ public void testProtoBufs() throws IOException {
+ doTest(1);
+ }
+
+ @Test
+ public void testProtoBufs2() throws IOException {
+ doTest(2);
+ }
+
+ @Test
+ public void testProtoBufs3() throws IOException {
+ doTest(3);
+ }
+
+ /**
+ * Write a protobuf to a buffer 'numProtos' times, and then
+ * read them back, making sure all data comes through correctly.
+ */
+ private void doTest(int numProtos) throws IOException {
+ Configuration conf = new Configuration();
+ DataOutputBuffer out = new DataOutputBuffer();
+
+ // Write numProtos protobufs to the buffer
+ Message[] sent = new Message[numProtos];
+ for (int i = 0; i < numProtos; i++) {
+ // Construct a test protocol buffer using one of the
+ // protos that ships with the protobuf library
+ Message testProto = DescriptorProtos.EnumValueDescriptorProto.newBuilder()
+ .setName("test" + i).setNumber(i).build();
+ ObjectWritable.writeObject(out, testProto,
+ DescriptorProtos.EnumValueDescriptorProto.class, conf);
+ sent[i] = testProto;
+ }
+
+ // Read back the data
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(out.getData(), out.getLength());
+
+ for (int i = 0; i < numProtos; i++) {
+ Message received = (Message)ObjectWritable.readObject(in, conf);
+
+ assertEquals(sent[i], received);
+ }
+ }
+
+}
Modified: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1136222&r1=1136221&r2=1136222&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java Wed Jun 15 22:21:31 2011
@@ -40,6 +40,10 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.AccessControlException;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
+
import static org.apache.hadoop.test.MetricsAsserts.*;
import static org.mockito.Mockito.*;
@@ -71,6 +75,9 @@ public class TestRPC extends TestCase {
int error() throws IOException;
void testServerGet() throws IOException;
int[] exchange(int[] values) throws IOException;
+
+ DescriptorProtos.EnumDescriptorProto exchangeProto(
+ DescriptorProtos.EnumDescriptorProto arg);
}
public static class TestImpl implements TestProtocol {
@@ -136,6 +143,11 @@ public class TestRPC extends TestCase {
}
return values;
}
+
+ @Override
+ public EnumDescriptorProto exchangeProto(EnumDescriptorProto arg) {
+ return arg;
+ }
}
//
@@ -314,6 +326,13 @@ public class TestRPC extends TestCase {
intResult = proxy.add(new int[] {1, 2});
assertEquals(intResult, 3);
+
+ // Test protobufs
+ EnumDescriptorProto sendProto =
+ EnumDescriptorProto.newBuilder().setName("test").build();
+ EnumDescriptorProto retProto = proxy.exchangeProto(sendProto);
+ assertEquals(sendProto, retProto);
+ assertNotSame(sendProto, retProto);
boolean caught = false;
try {
Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java?rev=1136222&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java (added)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java Wed Jun 15 22:21:31 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import com.google.protobuf.CodedOutputStream;
+
+public class TestProtoUtil {
+
+ /**
+ * Values to test encoding as variable length integers
+ */
+ private static final int[] TEST_VINT_VALUES = new int[] {
+ 0, 1, -1, 127, 128, 129, 255, 256, 257,
+ 0x1234, -0x1234,
+ 0x123456, -0x123456,
+ 0x12345678, -0x12345678
+ };
+
+ /**
+ * Test that readRawVarint32 is compatible with the varints encoded
+ * by ProtoBuf's CodedOutputStream.
+ */
+ @Test
+ public void testVarInt() throws IOException {
+ // Test a few manufactured values
+ for (int value : TEST_VINT_VALUES) {
+ doVarIntTest(value);
+ }
+ // Check 1-bits at every bit position
+ for (int i = 1; i != 0; i <<= 1) {
+ doVarIntTest(i);
+ doVarIntTest(-i);
+ doVarIntTest(i - 1);
+ doVarIntTest(~i);
+ }
+ }
+
+ private void doVarIntTest(int value) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ CodedOutputStream cout = CodedOutputStream.newInstance(baos);
+ cout.writeRawVarint32(value);
+ cout.flush();
+
+ DataInputStream dis = new DataInputStream(
+ new ByteArrayInputStream(baos.toByteArray()));
+ assertEquals(value, ProtoUtil.readRawVarint32(dis));
+ }
+}