You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2011/06/16 00:21:31 UTC

svn commit: r1136222 - in /hadoop/common/trunk/common: ./ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/util/ src/test/core/org/apache/hadoop/io/ src/test/core/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/util/

Author: todd
Date: Wed Jun 15 22:21:31 2011
New Revision: 1136222

URL: http://svn.apache.org/viewvc?rev=1136222&view=rev
Log:
HADOOP-7379. Add the ability to serialize and deserialize protocol buffers in ObjectWritable. Contributed by Todd Lipcon.

Added:
    hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java
    hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java
    hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java
    hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java
Modified:
    hadoop/common/trunk/common/CHANGES.txt
    hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java
    hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/trunk/common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1136222&r1=1136221&r2=1136222&view=diff
==============================================================================
--- hadoop/common/trunk/common/CHANGES.txt (original)
+++ hadoop/common/trunk/common/CHANGES.txt Wed Jun 15 22:21:31 2011
@@ -44,6 +44,9 @@ Trunk (unreleased changes)
     HADOOP-7144. Expose JMX metrics via JSON servlet. (Robert Joseph Evans via
     cdouglas)
 
+    HADOOP-7379. Add the ability to serialize and deserialize protocol buffers
+    in ObjectWritable. (todd)
+
   IMPROVEMENTS
 
     HADOOP-7042. Updates to test-patch.sh to include failed test names and

Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java?rev=1136222&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java (added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java Wed Jun 15 22:21:31 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * OutputStream implementation that wraps a DataOutput.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class DataOutputOutputStream extends OutputStream {
+
+  private final DataOutput out;
+
+  /**
+   * Construct an OutputStream from the given DataOutput. If 'out'
+   * is already an OutputStream, simply returns it. Otherwise, wraps
+   * it in an OutputStream.
+   * @param out the DataOutput to wrap
+   * @return an OutputStream instance that outputs to 'out'
+   */
+  public static OutputStream constructOutputStream(DataOutput out) {
+    if (out instanceof OutputStream) {
+      return (OutputStream)out;
+    } else {
+      return new DataOutputOutputStream(out);
+    }
+  }
+  
+  private DataOutputOutputStream(DataOutput out) {
+    this.out = out;
+  }
+  
+  @Override
+  public void write(int b) throws IOException {
+    out.writeByte(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    out.write(b, off, len);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    out.write(b);
+  }
+  
+
+}

Modified: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java?rev=1136222&r1=1136221&r2=1136222&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java (original)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java Wed Jun 15 22:21:31 2011
@@ -19,6 +19,8 @@
 package org.apache.hadoop.io;
 
 import java.lang.reflect.Array;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
 import java.io.*;
 import java.util.*;
@@ -26,6 +28,9 @@ import java.util.*;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.ProtoUtil;
+
+import com.google.protobuf.Message;
 
 /** A polymorphic Writable that writes an instance with it's class name.
  * Handles arrays, strings and primitive types without a Writable wrapper.
@@ -191,6 +196,9 @@ public class ObjectWritable implements W
       UTF8.writeString(out, instance.getClass().getName());
       ((Writable)instance).write(out);
 
+    } else if (Message.class.isAssignableFrom(declaredClass)) {
+      ((Message)instance).writeDelimitedTo(
+          DataOutputOutputStream.constructOutputStream(out));
     } else {
       throw new IOException("Can't write: "+instance+" as "+declaredClass);
     }
@@ -261,6 +269,8 @@ public class ObjectWritable implements W
       instance = UTF8.readString(in);
     } else if (declaredClass.isEnum()) {         // enum
       instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
+    } else if (Message.class.isAssignableFrom(declaredClass)) {
+      instance = tryInstantiateProtobuf(declaredClass, in);
     } else {                                      // Writable
       Class instanceClass = null;
       String str = UTF8.readString(in);
@@ -286,6 +296,67 @@ public class ObjectWritable implements W
   }
 
   /**
+   * Try to instantiate a protocol buffer of the given message class
+   * from the given input stream.
+   * 
+   * @param protoClass the class of the generated protocol buffer
+   * @param dataIn the input stream to read from
+   * @return the instantiated Message instance
+   * @throws IOException if an IO problem occurs
+   */
+  private static Message tryInstantiateProtobuf(
+      Class<?> protoClass,
+      DataInput dataIn) throws IOException {
+
+    try {
+      if (dataIn instanceof InputStream) {
+        // We can use the built-in parseDelimitedFrom and not have to re-copy
+        // the data
+        Method parseMethod = getStaticProtobufMethod(protoClass,
+            "parseDelimitedFrom", InputStream.class);
+        return (Message)parseMethod.invoke(null, (InputStream)dataIn);
+      } else {
+        // Have to read it into a buffer first, since protobuf doesn't deal
+        // with the DataInput interface directly.
+        
+        // Read the size delimiter that writeDelimitedTo writes
+        int size = ProtoUtil.readRawVarint32(dataIn);
+        if (size < 0) {
+          throw new IOException("Invalid size: " + size);
+        }
+      
+        byte[] data = new byte[size];
+        dataIn.readFully(data);
+        Method parseMethod = getStaticProtobufMethod(protoClass,
+            "parseFrom", byte[].class);
+        return (Message)parseMethod.invoke(null, data);
+      }
+    } catch (InvocationTargetException e) {
+      
+      if (e.getCause() instanceof IOException) {
+        throw (IOException)e.getCause();
+      } else {
+        throw new IOException(e.getCause());
+      }
+    } catch (IllegalAccessException iae) {
+      throw new AssertionError("Could not access parse method in " +
+          protoClass);
+    }
+  }
+
+  static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
+      Class<?> ... args) {
+
+    try {
+      return declaredClass.getMethod(method, args);
+    } catch (Exception e) {
+      // This is a bug in Hadoop - protobufs should all have this static method
+      throw new AssertionError("Protocol buffer class " + declaredClass +
+          " does not have an accessible parseFrom(InputStream) method!");
+    }
+  }
+
+  /**
    * Find and load the class with given name <tt>className</tt> by first finding
    * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
    * try load it directly.

Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java?rev=1136222&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java (added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java Wed Jun 15 22:21:31 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+public abstract class ProtoUtil {
+
+  /**
+   * Read a variable length integer in the same format that ProtoBufs encodes.
+   * @param in the input stream to read from
+   * @return the integer
+   * @throws IOException if it is malformed or EOF.
+   */
+  public static int readRawVarint32(DataInput in) throws IOException {
+    byte tmp = in.readByte();
+    if (tmp >= 0) {
+      return tmp;
+    }
+    int result = tmp & 0x7f;
+    if ((tmp = in.readByte()) >= 0) {
+      result |= tmp << 7;
+    } else {
+      result |= (tmp & 0x7f) << 7;
+      if ((tmp = in.readByte()) >= 0) {
+        result |= tmp << 14;
+      } else {
+        result |= (tmp & 0x7f) << 14;
+        if ((tmp = in.readByte()) >= 0) {
+          result |= tmp << 21;
+        } else {
+          result |= (tmp & 0x7f) << 21;
+          result |= (tmp = in.readByte()) << 28;
+          if (tmp < 0) {
+            // Discard upper 32 bits.
+            for (int i = 0; i < 5; i++) {
+              if (in.readByte() >= 0) {
+                return result;
+              }
+            }
+            throw new IOException("Malformed varint");
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+}

Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java?rev=1136222&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java (added)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java Wed Jun 15 22:21:31 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Message;
+
+/**
+ * Test case for the use of Protocol Buffers within ObjectWritable.
+ */
+public class TestObjectWritableProtos {
+
+  @Test
+  public void testProtoBufs() throws IOException {
+    doTest(1);
+  }
+
+  @Test
+  public void testProtoBufs2() throws IOException {
+    doTest(2);
+  }
+  
+  @Test
+  public void testProtoBufs3() throws IOException {
+    doTest(3);
+  }
+  
+  /**
+   * Write a protobuf to a buffer 'numProtos' times, and then
+   * read them back, making sure all data comes through correctly.
+   */
+  private void doTest(int numProtos) throws IOException {
+    Configuration conf = new Configuration();
+    DataOutputBuffer out = new DataOutputBuffer();
+
+    // Write numProtos protobufs to the buffer
+    Message[] sent = new Message[numProtos];
+    for (int i = 0; i < numProtos; i++) {
+      // Construct a test protocol buffer using one of the
+      // protos that ships with the protobuf library
+      Message testProto = DescriptorProtos.EnumValueDescriptorProto.newBuilder()
+        .setName("test" + i).setNumber(i).build();
+      ObjectWritable.writeObject(out, testProto,
+          DescriptorProtos.EnumValueDescriptorProto.class, conf);
+      sent[i] = testProto;
+    }
+
+    // Read back the data
+    DataInputBuffer in = new DataInputBuffer();
+    in.reset(out.getData(), out.getLength());
+    
+    for (int i = 0; i < numProtos; i++) {
+      Message received = (Message)ObjectWritable.readObject(in, conf);
+      
+      assertEquals(sent[i], received);
+    }
+  }
+
+}

Modified: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1136222&r1=1136221&r2=1136222&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java Wed Jun 15 22:21:31 2011
@@ -40,6 +40,10 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.AccessControlException;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
+
 import static org.apache.hadoop.test.MetricsAsserts.*;
 
 import static org.mockito.Mockito.*;
@@ -71,6 +75,9 @@ public class TestRPC extends TestCase {
     int error() throws IOException;
     void testServerGet() throws IOException;
     int[] exchange(int[] values) throws IOException;
+    
+    DescriptorProtos.EnumDescriptorProto exchangeProto(
+        DescriptorProtos.EnumDescriptorProto arg);
   }
 
   public static class TestImpl implements TestProtocol {
@@ -136,6 +143,11 @@ public class TestRPC extends TestCase {
       }
       return values;
     }
+
+    @Override
+    public EnumDescriptorProto exchangeProto(EnumDescriptorProto arg) {
+      return arg;
+    }
   }
 
   //
@@ -314,6 +326,13 @@ public class TestRPC extends TestCase {
 
     intResult = proxy.add(new int[] {1, 2});
     assertEquals(intResult, 3);
+    
+    // Test protobufs
+    EnumDescriptorProto sendProto =
+      EnumDescriptorProto.newBuilder().setName("test").build();
+    EnumDescriptorProto retProto = proxy.exchangeProto(sendProto);
+    assertEquals(sendProto, retProto);
+    assertNotSame(sendProto, retProto);
 
     boolean caught = false;
     try {

Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java?rev=1136222&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java (added)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java Wed Jun 15 22:21:31 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import com.google.protobuf.CodedOutputStream;
+
+public class TestProtoUtil {
+  
+  /**
+   * Values to test encoding as variable length integers
+   */
+  private static final int[] TEST_VINT_VALUES = new int[] {
+    0, 1, -1, 127, 128, 129, 255, 256, 257,
+    0x1234, -0x1234,
+    0x123456, -0x123456,
+    0x12345678, -0x12345678
+  };
+
+  /**
+   * Test that readRawVarint32 is compatible with the varints encoded
+   * by ProtoBuf's CodedOutputStream.
+   */
+  @Test
+  public void testVarInt() throws IOException {
+    // Test a few manufactured values
+    for (int value : TEST_VINT_VALUES) {
+      doVarIntTest(value);
+    }
+    // Check 1-bits at every bit position
+    for (int i = 1; i != 0; i <<= 1) {
+      doVarIntTest(i);
+      doVarIntTest(-i);
+      doVarIntTest(i - 1);
+      doVarIntTest(~i);
+    }
+  }
+  
+  private void doVarIntTest(int value) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    CodedOutputStream cout = CodedOutputStream.newInstance(baos);
+    cout.writeRawVarint32(value);
+    cout.flush();
+
+    DataInputStream dis = new DataInputStream(
+        new ByteArrayInputStream(baos.toByteArray()));
+    assertEquals(value, ProtoUtil.readRawVarint32(dis));
+  }
+}