You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/03/10 02:53:54 UTC

svn commit: r1299141 - in /hadoop/common/branches/branch-1.0: ./ src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/ src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/

Author: acmurthy
Date: Sat Mar 10 01:53:53 2012
New Revision: 1299141

URL: http://svn.apache.org/viewvc?rev=1299141&view=rev
Log:
Merge -c 1299140 from branch-1 to branch-1.0 to fix HADOOP-5450. Add support for application-specific typecodes to typed bytes. Contributed by Klaas Bosteels.

Modified:
    hadoop/common/branches/branch-1.0/CHANGES.txt
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java

Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sat Mar 10 01:53:53 2012
@@ -15,6 +15,9 @@ Release 1.0.2 - unreleased
     HADOOP-1722. Allow hadoop streaming to handle non-utf8 byte array. (Klaas
     Bosteels and Matthias Lehmann via acmurthy)
 
+    HADOOP-5450. Add support for application-specific typecodes to typed
+    bytes. (Klaas Bosteels via acmurthy) 
+
   BUG FIXES
 
     HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java Sat Mar 10 01:53:53 2012
@@ -23,6 +23,7 @@ package org.apache.hadoop.typedbytes;
  */
 public enum Type {
 
+  // codes for supported types (< 50):
   BYTES(0),
   BYTE(1),
   BOOL(2),
@@ -34,6 +35,11 @@ public enum Type {
   VECTOR(8),
   LIST(9),
   MAP(10),
+  
+  // application-specific codes (50-200):
+  WRITABLE(50),
+  
+  // low-level codes (> 200):
   MARKER(255);
 
   final int code;

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java Sat Mar 10 01:53:53 2012
@@ -101,6 +101,8 @@ public class TypedBytesInput {
       return readMap();
     } else if (code == Type.MARKER.code) {
       return null;
+    } else if (50 <= code && code <= 200) { // application-specific typecodes
+      return new Buffer(readBytes());
     } else {
       throw new RuntimeException("unknown type");
     }
@@ -146,6 +148,8 @@ public class TypedBytesInput {
       return readRawMap();
     } else if (code == Type.MARKER.code) {
       return null;
+    } else if (50 <= code && code <= 200) { // application-specific typecodes
+      return readRawBytes();
     } else {
       throw new RuntimeException("unknown type");
     }

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java Sat Mar 10 01:53:53 2012
@@ -124,16 +124,27 @@ public class TypedBytesOutput {
   }
 
   /**
-   * Writes a bytes array as a typed bytes sequence.
+   * Writes a bytes array as a typed bytes sequence, using a given typecode.
    * 
    * @param bytes the bytes array to be written
+   * @param code the typecode to use
    * @throws IOException
    */
-  public void writeBytes(byte[] bytes) throws IOException {
-    out.write(Type.BYTES.code);
+  public void writeBytes(byte[] bytes, int code) throws IOException {
+    out.write(code);
     out.writeInt(bytes.length);
     out.write(bytes);
   }
+  
+  /**
+   * Writes a bytes array as a typed bytes sequence.
+   * 
+   * @param bytes the bytes array to be written
+   * @throws IOException
+   */
+  public void writeBytes(byte[] bytes) throws IOException {
+    writeBytes(bytes, Type.BYTES.code);
+  }
 
   /**
    * Writes a byte as a typed bytes sequence.

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java Sat Mar 10 01:53:53 2012
@@ -18,9 +18,13 @@
 
 package org.apache.hadoop.typedbytes;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
@@ -36,17 +40,22 @@ import org.apache.hadoop.io.VIntWritable
 import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Provides functionality for reading typed bytes as Writable objects.
  * 
  * @see TypedBytesInput
  */
-public class TypedBytesWritableInput {
+public class TypedBytesWritableInput implements Configurable {
 
   private TypedBytesInput in;
+  private Configuration conf;
 
-  private TypedBytesWritableInput() {}
+  private TypedBytesWritableInput() {
+    conf = new Configuration();
+  }
 
   private void setTypedBytesInput(TypedBytesInput in) {
     this.in = in;
@@ -86,6 +95,7 @@ public class TypedBytesWritableInput {
 
   /** Creates a new instance of TypedBytesWritableInput. */
   public TypedBytesWritableInput(TypedBytesInput in) {
+    this();
     this.in = in;
   }
 
@@ -120,6 +130,8 @@ public class TypedBytesWritableInput {
       return readArray();
     case MAP:
       return readMap();
+    case WRITABLE:
+      return readWritable();
     default:
       throw new RuntimeException("unknown type");
     }
@@ -151,6 +163,8 @@ public class TypedBytesWritableInput {
       return ArrayWritable.class;
     case MAP:
       return MapWritable.class;
+    case WRITABLE:
+      return Writable.class;
     default:
       throw new RuntimeException("unknown type");
     }
@@ -331,5 +345,36 @@ public class TypedBytesWritableInput {
   public SortedMapWritable readSortedMap() throws IOException {
     return readSortedMap(null);
   }
+  
+  public Writable readWritable(Writable writable) throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(in.readBytes());
+    DataInputStream dis = new DataInputStream(bais);
+    String className = WritableUtils.readString(dis);
+    if (writable == null) {
+      try {
+        Class<? extends Writable> cls = 
+          conf.getClassByName(className).asSubclass(Writable.class);
+        writable = (Writable) ReflectionUtils.newInstance(cls, conf);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+    } else if (!writable.getClass().getName().equals(className)) {
+      throw new IOException("wrong Writable class given");
+    }
+    writable.readFields(dis);
+    return writable;
+  }
+
+  public Writable readWritable() throws IOException {
+    return readWritable(null);
+  }
 
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+  
 }

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java Sat Mar 10 01:53:53 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.VIntWritable
 import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.record.Record;
 
 /**
@@ -91,6 +92,7 @@ public class TypedBytesWritableOutput {
 
   /** Creates a new instance of TypedBytesWritableOutput. */
   public TypedBytesWritableOutput(TypedBytesOutput out) {
+    this();
     this.out = out;
   }
 
@@ -209,13 +211,12 @@ public class TypedBytesWritableOutput {
   }
 
   public void writeWritable(Writable w) throws IOException {
-    out.writeVectorHeader(2);
-    out.writeString(w.getClass().getName());
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream dos = new DataOutputStream(baos);
+    WritableUtils.writeString(dos, w.getClass().getName());
     w.write(dos);
     dos.close();
-    out.writeBytes(baos.toByteArray());
+    out.writeBytes(baos.toByteArray(), Type.WRITABLE.code);
   }
 
 }

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html Sat Mar 10 01:53:53 2012
@@ -41,6 +41,8 @@ Each typed bytes sequence starts with an
 <tr><td><i>10</i></td><td>A map.</td></tr>
 </table>
 </p>
+The type codes <i>50</i> to <i>200</i> are treated as aliases for <i>0</i>, and can thus be used for
+application-specific serialization.
 
 <h3>Subsequent Bytes</h3>
 

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java Sat Mar 10 01:53:53 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.VIntWritable;
 import org.apache.hadoop.io.VLongWritable;
@@ -80,6 +81,7 @@ public class TestIO extends TestCase {
       (byte) 123, true, 12345, 123456789L, (float) 1.2, 1.234,
       "random string", vector, list, map 
     };
+    byte[] appSpecificBytes = new byte[] { 1, 2, 3 };
 
     FileOutputStream ostream = new FileOutputStream(tmpfile);
     DataOutputStream dostream = new DataOutputStream(ostream);
@@ -87,6 +89,7 @@ public class TestIO extends TestCase {
     for (Object obj : objects) {
       out.write(obj);
     }
+    out.writeBytes(appSpecificBytes, 100);
     dostream.close();
     ostream.close();
 
@@ -96,6 +99,7 @@ public class TestIO extends TestCase {
     for (Object obj : objects) {
       assertEquals(obj, in.read());
     }
+    assertEquals(new Buffer(appSpecificBytes), in.read());
     distream.close();
     istream.close();
 
@@ -114,6 +118,9 @@ public class TestIO extends TestCase {
       dis = new DataInputStream(bais);
       assertEquals(obj, (new TypedBytesInput(dis)).read());
     }
+    byte[] rawBytes = in.readRaw();
+    assertEquals(new Buffer(appSpecificBytes),
+      new Buffer(rawBytes, 5, rawBytes.length - 5));
     distream.close();
     istream.close();
   }
@@ -164,7 +171,8 @@ public class TestIO extends TestCase {
       new ByteWritable((byte) 123), new BooleanWritable(true),
       new VIntWritable(12345), new VLongWritable(123456789L),
       new FloatWritable((float) 1.2), new DoubleWritable(1.234),
-      new Text("random string")
+      new Text("random string"),
+      new ObjectWritable("test")
     };
     TypedBytesWritable tbw = new TypedBytesWritable();
     tbw.setValue("typed bytes text");
@@ -201,7 +209,7 @@ public class TestIO extends TestCase {
 
     TypedBytesWritableInput in = new TypedBytesWritableInput(distream);
     for (Writable w : writables) {
-      assertEquals(w, in.read());
+      assertEquals(w.toString(), in.read().toString());
     }
 
     assertEquals(tbw.getValue().toString(), in.read().toString());