You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/09/01 23:36:29 UTC

svn commit: r810252 [1/3] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/ contrib/src/java/org/apache/hadoop/hive/contrib/util/ contrib/src/java/org/apache/hadoop/hi...

Author: zshao
Date: Tue Sep  1 21:36:28 2009
New Revision: 810252

URL: http://svn.apache.org/viewvc?rev=810252&view=rev
Log:
HIVE-785. Add RecordWriter for ScriptOperator. (Namit Jain via zshao)

Added:
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/Type.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesInput.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesOutput.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordInput.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordOutput.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordReader.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordWriter.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritable.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableInput.java
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableOutput.java
    hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java
    hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesOutput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesRecordInput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesRecordOutput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritable.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritableInput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritableOutput.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
    hadoop/hive/trunk/ql/src/test/results/clientnegative/script_error.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input14.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input14_limit.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input17.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input18.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input20.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input33.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input34.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input35.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input36.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input38.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/nullscript.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_transform.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/regexp_extract.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr2.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/parse/input20.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/parse/input4.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/parse/input5.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=810252&r1=810251&r2=810252&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Sep  1 21:36:28 2009
@@ -6,7 +6,7 @@
 
   NEW FEATURES
   
-    HIVE-743. Let user specify serde for custom sctipts.
+    HIVE-743. Let user specify serde for custom scripts.
     (Namit Jain via rmurthy)
 
     HIVE-749. add hive.optimize.pruner
@@ -33,6 +33,8 @@
     HIVE-645. UDF to run JDBC
     (Edward Capriolo via namit)
 
+    HIVE-785. Add RecordWriter for ScriptOperator. (Namit Jain via zshao)
+
   IMPROVEMENTS
 
     HIVE-760. Add version info to META-INF/MANIFEST.MF.

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=810252&r1=810251&r2=810252&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Sep  1 21:36:28 2009
@@ -137,6 +137,7 @@
     // Default serde and record reader for user scripts
     HIVESCRIPTSERDE("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
     HIVESCRIPTRECORDREADER("hive.script.recordreader", "org.apache.hadoop.hive.ql.exec.TextRecordReader"),
+    HIVESCRIPTRECORDWRITER("hive.script.recordwriter", "org.apache.hadoop.hive.ql.exec.TextRecordWriter"),
     
     // HWI
     HIVEHWILISTENHOST("hive.hwi.listen.host","0.0.0.0"),

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=810252&r1=810251&r2=810252&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Tue Sep  1 21:36:28 2009
@@ -351,4 +351,10 @@
   <description>The default record reader for reading data from the user scripts. </description>
 </property>
 
+<property>
+  <name>hive.script.recordwriter</name>
+  <value>org.apache.hadoop.hive.ql.exec.TextRecordWriter</value>
+  <description>The default record writer for writing data to the user scripts. </description>
+</property>
+
 </configuration>

Modified: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java?rev=810252&r1=810251&r2=810252&view=diff
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java (original)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java Tue Sep  1 21:36:28 2009
@@ -26,9 +26,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.util.typedbytes.Type;
-import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableInput;
-import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableOutput;
+import org.apache.hadoop.hive.contrib.util.typedbytes.Type;
+import org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesWritableInput;
+import org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesWritableOutput;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -159,11 +159,11 @@
   public Object deserialize(Writable blob) throws SerDeException {
 
     BytesWritable data = (BytesWritable)blob;
-    inBarrStr.reset(data.get(), 0, data.getSize()-1);   
-    
+    inBarrStr.reset(data.get(), 0, data.getSize());   
+
     try {
 
-      for (int i=0; i<columnNames.size(); i++) {
+      for (int i=0; i < columnNames.size(); i++) {
         row.set(i, deserializeField(tbIn, columnTypes.get(i), row.get(i)));
       }
 

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/Type.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/Type.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/Type.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/Type.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+/**
+ * The possible type codes.
+ */
+public enum Type {
+
+  // codes for supported types (< 50):
+  BYTES(0),
+  BYTE(1),
+  BOOL(2),
+  INT(3),
+  LONG(4),
+  FLOAT(5),
+  DOUBLE(6),
+  STRING(7),
+  VECTOR(8),
+  LIST(9),
+  MAP(10),
+  SHORT(11),
+  
+  // application-specific codes (50-200):
+  WRITABLE(50),
+  
+  ENDOFRECORD(100),
+  
+  // low-level codes (> 200):
+  MARKER(255);
+
+  final int code;
+
+  Type(int code) {
+    this.code = code;
+  }
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesInput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesInput.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesInput.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesInput.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.record.Buffer;
+
+/**
+ * Provides functionality for reading typed bytes.
+ */
+public class TypedBytesInput {
+
+  private DataInput in;
+
+  private TypedBytesInput() {}
+
+  private void setDataInput(DataInput in) {
+    this.in = in;
+  }
+
+  private static ThreadLocal tbIn = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new TypedBytesInput();
+    }
+  };
+
+  /**
+   * Get a thread-local typed bytes input for the supplied {@link DataInput}.
+   * @param in data input object
+   * @return typed bytes input corresponding to the supplied {@link DataInput}.
+   */
+  public static TypedBytesInput get(DataInput in) {
+    TypedBytesInput bin = (TypedBytesInput) tbIn.get();
+    bin.setDataInput(in);
+    return bin;
+  }
+
+  /** Creates a new instance of TypedBytesInput. */
+  public TypedBytesInput(DataInput in) {
+    this.in = in;
+  }
+
+  /**
+   * Reads a typed bytes sequence and converts it to a Java object. The first 
+   * byte is interpreted as a type code, and then the right number of 
+   * subsequent bytes are read depending on the obtained type.
+   * @return the obtained object or null when the end of the file is reached
+   * @throws IOException
+   */
+  public Object read() throws IOException {
+    int code = 1;
+    try {
+      code = in.readUnsignedByte();
+    } catch (EOFException eof) {
+      return null;
+    }
+    if (code == Type.BYTES.code) {
+      return new Buffer(readBytes());
+    } else if (code == Type.BYTE.code) {
+      return readByte();
+    } else if (code == Type.BOOL.code) {
+      return readBool();
+    } else if (code == Type.INT.code) {
+      return readInt();
+    } else if (code == Type.SHORT.code) {
+      return readShort();
+    } else if (code == Type.LONG.code) {
+      return readLong();
+    } else if (code == Type.FLOAT.code) {
+      return readFloat();
+    } else if (code == Type.DOUBLE.code) {
+      return readDouble();
+    } else if (code == Type.STRING.code) {
+      return readString();
+    } else if (code == Type.VECTOR.code) {
+      return readVector();
+    } else if (code == Type.LIST.code) {
+      return readList();
+    } else if (code == Type.MAP.code) {
+      return readMap();
+    } else if (code == Type.MARKER.code) {
+      return null;
+    } else if (50 <= code && code <= 200) { // application-specific typecodes
+      return new Buffer(readBytes());
+    } else {
+      throw new RuntimeException("unknown type");
+    }
+  }
+
+  /**
+   * Reads a typed bytes sequence. The first byte is interpreted as a type code,
+   * and then the right number of subsequent bytes are read depending on the
+   * obtained type.
+   * 
+   * @return the obtained typed bytes sequence or null when the end of the file
+   *         is reached
+   * @throws IOException
+   */
+  public byte[] readRaw() throws IOException {
+    int code = -1;
+    try {
+      code = in.readUnsignedByte();
+    } catch (EOFException eof) {
+      return null;
+    }
+    if (code == Type.BYTES.code) {
+      return readRawBytes();
+    } else if (code == Type.BYTE.code) {
+      return readRawByte();
+    } else if (code == Type.BOOL.code) {
+      return readRawBool();
+    } else if (code == Type.INT.code) {
+      return readRawInt();
+    } else if (code == Type.LONG.code) {
+      return readRawLong();
+    } else if (code == Type.FLOAT.code) {
+      return readRawFloat();
+    } else if (code == Type.DOUBLE.code) {
+      return readRawDouble();
+    } else if (code == Type.STRING.code) {
+      return readRawString();
+    } else if (code == Type.VECTOR.code) {
+      return readRawVector();
+    } else if (code == Type.LIST.code) {
+      return readRawList();
+    } else if (code == Type.MAP.code) {
+      return readRawMap();
+    } else if (code == Type.MARKER.code) {
+      return null;
+    } else if (50 <= code && code <= 200) { // application-specific typecodes
+      return readRawBytes();
+    } else {
+      throw new RuntimeException("unknown type");
+    }
+  }
+
+  /**
+   * Reads a type byte and returns the corresponding {@link Type}.
+   * @return the obtained Type or null when the end of the file is reached
+   * @throws IOException
+   */
+  public Type readType() throws IOException {
+    int code = -1;
+    try {
+      code = in.readUnsignedByte();
+    } catch (EOFException eof) {
+      return null;
+    }
+    for (Type type : Type.values()) {
+      if (type.code == code) {
+        return type;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Skips a type byte.
+   * @return true iff the end of the file was not reached
+   * @throws IOException
+   */
+  public boolean skipType() throws IOException {
+    try {
+      in.readByte();
+      return true;
+    } catch (EOFException eof) {
+      return false;
+    }
+  }
+
+  /**
+   * Reads the bytes following a <code>Type.BYTES</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readBytes() throws IOException {
+    int length = in.readInt();
+    byte[] bytes = new byte[length];
+    in.readFully(bytes);
+    return bytes;
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.BYTES</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawBytes() throws IOException {
+    int length = in.readInt();
+    byte[] bytes = new byte[5 + length];
+    bytes[0] = (byte) Type.BYTES.code;
+    bytes[1] = (byte) (0xff & (length >> 24));
+    bytes[2] = (byte) (0xff & (length >> 16));
+    bytes[3] = (byte) (0xff & (length >> 8));
+    bytes[4] = (byte) (0xff & length);
+    in.readFully(bytes, 5, length);
+    return bytes;
+  }
+
+  /**
+   * Reads the byte following a <code>Type.BYTE</code> code.
+   * @return the obtained byte
+   * @throws IOException
+   */
+  public byte readByte() throws IOException {
+    return in.readByte();
+  }
+
+  /**
+   * Reads the raw byte following a <code>Type.BYTE</code> code.
+   * @return the obtained byte
+   * @throws IOException
+   */
+  public byte[] readRawByte() throws IOException {
+    byte[] bytes = new byte[2];
+    bytes[0] = (byte) Type.BYTE.code;
+    in.readFully(bytes, 1, 1);
+    return bytes;
+  }
+
+  /**
+   * Reads the boolean following a <code>Type.BOOL</code> code.
+   * @return the obtained boolean
+   * @throws IOException
+   */
+  public boolean readBool() throws IOException {
+    return in.readBoolean();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.BOOL</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawBool() throws IOException {
+    byte[] bytes = new byte[2];
+    bytes[0] = (byte) Type.BOOL.code;
+    in.readFully(bytes, 1, 1);
+    return bytes;
+  }
+
+  /**
+   * Reads the integer following a <code>Type.INT</code> code.
+   * @return the obtained integer
+   * @throws IOException
+   */
+  public int readInt() throws IOException {
+    return in.readInt();
+  }
+
+  /**
+   * Reads the short following a <code>Type.SHORT</code> code.
+   * @return the obtained short
+   * @throws IOException
+   */
+  public short readShort() throws IOException {
+    return in.readShort();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.INT</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawInt() throws IOException {
+    byte[] bytes = new byte[5];
+    bytes[0] = (byte) Type.INT.code;
+    in.readFully(bytes, 1, 4);
+    return bytes;
+  }
+
+  /**
+   * Reads the long following a <code>Type.LONG</code> code.
+   * @return the obtained long
+   * @throws IOException
+   */
+  public long readLong() throws IOException {
+    return in.readLong();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.LONG</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawLong() throws IOException {
+    byte[] bytes = new byte[9];
+    bytes[0] = (byte) Type.LONG.code;
+    in.readFully(bytes, 1, 8);
+    return bytes;
+  }
+
+  /**
+   * Reads the float following a <code>Type.FLOAT</code> code.
+   * @return the obtained float
+   * @throws IOException
+   */
+  public float readFloat() throws IOException {
+    return in.readFloat();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.FLOAT</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawFloat() throws IOException {
+    byte[] bytes = new byte[5];
+    bytes[0] = (byte) Type.FLOAT.code;
+    in.readFully(bytes, 1, 4);
+    return bytes;
+  }
+
+  /**
+   * Reads the double following a <code>Type.DOUBLE</code> code.
+   * @return the obtained double
+   * @throws IOException
+   */
+  public double readDouble() throws IOException {
+    return in.readDouble();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.DOUBLE</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawDouble() throws IOException {
+    byte[] bytes = new byte[9];
+    bytes[0] = (byte) Type.DOUBLE.code;
+    in.readFully(bytes, 1, 8);
+    return bytes;
+  }
+
+  /**
+   * Reads the string following a <code>Type.STRING</code> code.
+   * @return the obtained string
+   * @throws IOException
+   */
+  public String readString() throws IOException {
+    return WritableUtils.readString(in);
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.STRING</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawString() throws IOException {
+    int length = in.readInt();
+    byte[] bytes = new byte[5 + length];
+    bytes[0] = (byte) Type.STRING.code;
+    bytes[1] = (byte) (0xff & (length >> 24));
+    bytes[2] = (byte) (0xff & (length >> 16));
+    bytes[3] = (byte) (0xff & (length >> 8));
+    bytes[4] = (byte) (0xff & length);
+    in.readFully(bytes, 5, length);
+    return bytes;
+  }
+
+  /**
+   * Reads the vector following a <code>Type.VECTOR</code> code.
+   * @return the obtained vector
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public ArrayList readVector() throws IOException {
+    int length = readVectorHeader();
+    ArrayList result = new ArrayList(length);
+    for (int i = 0; i < length; i++) {
+      result.add(read());
+    }
+    return result;
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.VECTOR</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawVector() throws IOException {
+    Buffer buffer = new Buffer();
+    int length = readVectorHeader();
+    buffer.append(new byte[] {
+      (byte) Type.VECTOR.code,
+      (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)),
+      (byte) (0xff & (length >> 8)), (byte) (0xff & length)
+    });
+    for (int i = 0; i < length; i++) {
+      buffer.append(readRaw());
+    }
+    return buffer.get();
+  }
+
+  /**
+   * Reads the header following a <code>Type.VECTOR</code> code.
+   * @return the number of elements in the vector
+   * @throws IOException
+   */
+  public int readVectorHeader() throws IOException {
+    return in.readInt();
+  }
+
+  /**
+   * Reads the list following a <code>Type.LIST</code> code.
+   * @return the obtained list
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public List readList() throws IOException {
+    List list = new ArrayList();
+    Object obj = read();
+    while (obj != null) {
+      list.add(obj);
+      obj = read();
+    }
+    return list;
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.LIST</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawList() throws IOException {
+    Buffer buffer = new Buffer(new byte[] { (byte) Type.LIST.code });
+    byte[] bytes = readRaw();
+    while (bytes != null) {
+      buffer.append(bytes);
+      bytes = readRaw();
+    }
+    buffer.append(new byte[] { (byte) Type.MARKER.code });
+    return buffer.get();
+  }
+
+  /**
+   * Reads the map following a <code>Type.MAP</code> code.
+   * @return the obtained map
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public TreeMap readMap() throws IOException {
+    int length = readMapHeader();
+    TreeMap result = new TreeMap();
+    for (int i = 0; i < length; i++) {
+      Object key = read();
+      Object value = read();
+      result.put(key, value);
+    }
+    return result;
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.MAP</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawMap() throws IOException {
+    Buffer buffer = new Buffer();
+    int length = readMapHeader();
+    buffer.append(new byte[] {
+      (byte) Type.MAP.code,
+      (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)),
+      (byte) (0xff & (length >> 8)), (byte) (0xff & length)
+    });
+    for (int i = 0; i < length; i++) {
+      buffer.append(readRaw());
+      buffer.append(readRaw());
+    }
+    return buffer.get();
+  }
+
+  /**
+   * Reads the header following a <code>Type.MAP</code> code.
+   * @return the number of key-value pairs in the map
+   * @throws IOException
+   */
+  public int readMapHeader() throws IOException {
+    return in.readInt();
+  }
+
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesOutput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesOutput.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesOutput.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesOutput.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.record.Buffer;
+
+/**
+ * Provides functionality for writing typed bytes.
+ */
+public class TypedBytesOutput {
+
+  private DataOutput out;
+
+  private TypedBytesOutput() {}
+
+  private void setDataOutput(DataOutput out) {
+    this.out = out;
+  }
+
+  private static ThreadLocal tbOut = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new TypedBytesOutput();
+    }
+  };
+
+  /**
+   * Get a thread-local typed bytes output for the supplied {@link DataOutput}.
+   * 
+   * @param out data output object
+   * @return typed bytes output corresponding to the supplied 
+   * {@link DataOutput}.
+   */
+  public static TypedBytesOutput get(DataOutput out) {
+    TypedBytesOutput bout = (TypedBytesOutput) tbOut.get();
+    bout.setDataOutput(out);
+    return bout;
+  }
+
+  /** Creates a new instance of TypedBytesOutput. */
+  public TypedBytesOutput(DataOutput out) {
+    this.out = out;
+  }
+  
+  /**
+   * Writes a Java object as a typed bytes sequence.
+   * 
+   * @param obj the object to be written
+   * @throws IOException
+   */
+  public void write(Object obj) throws IOException {
+    if (obj instanceof Buffer) {
+      writeBytes(((Buffer) obj).get());
+    } else if (obj instanceof Byte) {
+      writeByte((Byte) obj);
+    } else if (obj instanceof Boolean) {
+      writeBool((Boolean) obj);
+    } else if (obj instanceof Integer) {
+      writeInt((Integer) obj);
+    } else if (obj instanceof Long) {
+      writeLong((Long) obj);
+    } else if (obj instanceof Float) {
+      writeFloat((Float) obj);
+    } else if (obj instanceof Double) {
+      writeDouble((Double) obj);
+    } else if (obj instanceof String) {
+      writeString((String) obj);
+    } else if (obj instanceof ArrayList) {
+      writeVector((ArrayList) obj);
+    } else if (obj instanceof List) {
+      writeList((List) obj);
+    } else if (obj instanceof Map) {
+      writeMap((Map) obj);
+    } else {
+      throw new RuntimeException("cannot write objects of this type");
+    }
+  }
+
+  /**
+   * Writes a raw sequence of typed bytes.
+   * 
+   * @param bytes the bytes to be written
+   * @throws IOException
+   */
+  public void writeRaw(byte[] bytes) throws IOException {
+    out.write(bytes);
+  }
+
+  /**
+   * Writes a raw sequence of typed bytes.
+   * 
+   * @param bytes the bytes to be written
+   * @param offset an offset in the given array
+   * @param length number of bytes from the given array to write
+   * @throws IOException
+   */
+  public void writeRaw(byte[] bytes, int offset, int length)
+    throws IOException {
+    out.write(bytes, offset, length);
+  }
+
+  /**
+   * Writes a bytes array as a typed bytes sequence, using a given typecode.
+   * 
+   * @param bytes the bytes array to be written
+   * @param code the typecode to use
+   * @throws IOException
+   */
+  public void writeBytes(byte[] bytes, int code) throws IOException {
+    out.write(code);
+    out.writeInt(bytes.length);
+    out.write(bytes);
+  }
+  
+  /**
+   * Writes a bytes array as a typed bytes sequence.
+   * 
+   * @param bytes the bytes array to be written
+   * @throws IOException
+   */
+  public void writeBytes(byte[] bytes) throws IOException {
+    writeBytes(bytes, Type.BYTES.code);
+  }
+
+  /**
+   * Writes a byte as a typed bytes sequence.
+   * 
+   * @param b the byte to be written
+   * @throws IOException
+   */
+  public void writeByte(byte b) throws IOException {
+    out.write(Type.BYTE.code);
+    out.write(b);
+  }
+
+  /**
+   * Writes a boolean as a typed bytes sequence.
+   * 
+   * @param b the boolean to be written
+   * @throws IOException
+   */
+  public void writeBool(boolean b) throws IOException {
+    out.write(Type.BOOL.code);
+    out.writeBoolean(b);
+  }
+
+  /**
+   * Writes an integer as a typed bytes sequence.
+   * 
+   * @param i the integer to be written
+   * @throws IOException
+   */
+  public void writeInt(int i) throws IOException {
+    out.write(Type.INT.code);
+    out.writeInt(i);
+  }
+
+  /**
+   * Writes a long as a typed bytes sequence.
+   * 
+   * @param l the long to be written
+   * @throws IOException
+   */
+  public void writeLong(long l) throws IOException {
+    out.write(Type.LONG.code);
+    out.writeLong(l);
+  }
+
+  /**
+   * Writes a float as a typed bytes sequence.
+   * 
+   * @param f the float to be written
+   * @throws IOException
+   */
+  public void writeFloat(float f) throws IOException {
+    out.write(Type.FLOAT.code);
+    out.writeFloat(f);
+  }
+
+  /**
+   * Writes a double as a typed bytes sequence.
+   * 
+   * @param d the double to be written
+   * @throws IOException
+   */
+  public void writeDouble(double d) throws IOException {
+    out.write(Type.DOUBLE.code);
+    out.writeDouble(d);
+  }
+  
+  /**
+   * Writes a short as a typed bytes sequence.
+   * 
+   * @param s the short to be written
+   * @throws IOException
+   */
+  public void writeShort(short s) throws IOException {
+    out.write(Type.SHORT.code);
+    out.writeShort(s);
+  }
+
+  /**
+   * Writes a string as a typed bytes sequence.
+   * 
+   * @param s the string to be written
+   * @throws IOException
+   */
+  public void writeString(String s) throws IOException {
+    out.write(Type.STRING.code);
+    WritableUtils.writeString(out, s);
+  }
+
+  /**
+   * Writes a vector as a typed bytes sequence.
+   * 
+   * @param vector the vector to be written
+   * @throws IOException
+   */
+  public void writeVector(ArrayList vector) throws IOException {
+    writeVectorHeader(vector.size());
+    for (Object obj : vector) {
+      write(obj);
+    }
+  }
+
+  /**
+   * Writes a vector header.
+   * 
+   * @param length the number of elements in the vector
+   * @throws IOException
+   */
+  public void writeVectorHeader(int length) throws IOException {
+    out.write(Type.VECTOR.code);
+    out.writeInt(length);
+  }
+
+  /**
+   * Writes a list as a typed bytes sequence.
+   * 
+   * @param list the list to be written
+   * @throws IOException
+   */
+  public void writeList(List list) throws IOException {
+    writeListHeader();
+    for (Object obj : list) {
+      write(obj);
+    }
+    writeListFooter();
+  }
+
+  /**
+   * Writes a list header.
+   * 
+   * @throws IOException
+   */
+  public void writeListHeader() throws IOException {
+    out.write(Type.LIST.code);
+  }
+
+  /**
+   * Writes a list footer.
+   * 
+   * @throws IOException
+   */
+  public void writeListFooter() throws IOException {
+    out.write(Type.MARKER.code);
+  }
+
+  /**
+   * Writes a map as a typed bytes sequence.
+   * 
+   * @param map the map to be written
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public void writeMap(Map map) throws IOException {
+    writeMapHeader(map.size());
+    Set<Entry> entries = map.entrySet();
+    for (Entry entry : entries) {
+      write(entry.getKey());
+      write(entry.getValue());
+    }
+  }
+
+  /**
+   * Writes a map header.
+   * 
+   * @param length the number of key-value pairs in the map
+   * @throws IOException
+   */
+  public void writeMapHeader(int length) throws IOException {
+    out.write(Type.MAP.code);
+    out.writeInt(length);
+  }
+  
+  public void writeEndOfRecord() throws IOException {
+    out.write(Type.ENDOFRECORD.code);
+  }
+
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordInput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordInput.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordInput.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordInput.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.record.Buffer;
+import org.apache.hadoop.record.Index;
+import org.apache.hadoop.record.RecordInput;
+
+/**
+ * Serializer for records that writes typed bytes.
+ */
+public class TypedBytesRecordInput implements RecordInput {
+
+  private TypedBytesInput in;
+
+  private TypedBytesRecordInput() {}
+
+  private void setTypedBytesInput(TypedBytesInput in) {
+    this.in = in;
+  }
+
+  private static ThreadLocal tbIn = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new TypedBytesRecordInput();
+    }
+  };
+
+  /**
+   * Get a thread-local typed bytes record input for the supplied
+   * {@link TypedBytesInput}.
+   * 
+   * @param in typed bytes input object
+   * @return typed bytes record input corresponding to the supplied
+   *         {@link TypedBytesInput}.
+   */
+  public static TypedBytesRecordInput get(TypedBytesInput in) {
+    TypedBytesRecordInput bin = (TypedBytesRecordInput) tbIn.get();
+    bin.setTypedBytesInput(in);
+    return bin;
+  }
+
+  /**
+   * Get a thread-local typed bytes record input for the supplied
+   * {@link DataInput}.
+   * 
+   * @param in data input object
+   * @return typed bytes record input corresponding to the supplied
+   *         {@link DataInput}.
+   */
+  public static TypedBytesRecordInput get(DataInput in) {
+    return get(TypedBytesInput.get(in));
+  }
+
+  /** Creates a new instance of TypedBytesRecordInput. */
+  public TypedBytesRecordInput(TypedBytesInput in) {
+    this.in = in;
+  }
+
+  /** Creates a new instance of TypedBytesRecordInput. */
+  public TypedBytesRecordInput(DataInput in) {
+    this(new TypedBytesInput(in));
+  }
+
+  public boolean readBool(String tag) throws IOException {
+    in.skipType();
+    return in.readBool();
+  }
+
+  public Buffer readBuffer(String tag) throws IOException {
+    in.skipType();
+    return new Buffer(in.readBytes());
+  }
+
+  public byte readByte(String tag) throws IOException {
+    in.skipType();
+    return in.readByte();
+  }
+
+  public double readDouble(String tag) throws IOException {
+    in.skipType();
+    return in.readDouble();
+  }
+
+  public float readFloat(String tag) throws IOException {
+    in.skipType();
+    return in.readFloat();
+  }
+
+  public int readInt(String tag) throws IOException {
+    in.skipType();
+    return in.readInt();
+  }
+
+  public long readLong(String tag) throws IOException {
+    in.skipType();
+    return in.readLong();
+  }
+
+  public String readString(String tag) throws IOException {
+    in.skipType();
+    return in.readString();
+  }
+
+  public void startRecord(String tag) throws IOException {
+    in.skipType();
+  }
+
+  public Index startVector(String tag) throws IOException {
+    in.skipType();
+    return new TypedBytesIndex(in.readVectorHeader());
+  }
+
+  public Index startMap(String tag) throws IOException {
+    in.skipType();
+    return new TypedBytesIndex(in.readMapHeader());
+  }
+
+  public void endRecord(String tag) throws IOException {}
+
+  public void endVector(String tag) throws IOException {}
+
+  public void endMap(String tag) throws IOException {}
+
+  private static  final class TypedBytesIndex implements Index {
+    private int nelems;
+
+    private TypedBytesIndex(int nelems) {
+      this.nelems = nelems;
+    }
+
+    public boolean done() {
+      return (nelems <= 0);
+    }
+
+    public void incr() {
+      nelems--;
+    }
+  }
+
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordOutput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordOutput.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordOutput.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordOutput.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.TreeMap;
+
+import org.apache.hadoop.record.Buffer;
+import org.apache.hadoop.record.Record;
+import org.apache.hadoop.record.RecordOutput;
+
+/**
+ * Deserialized for records that reads typed bytes.
+ */
+public class TypedBytesRecordOutput implements RecordOutput {
+
+  private TypedBytesOutput out;
+
+  private TypedBytesRecordOutput() {}
+
+  private void setTypedBytesOutput(TypedBytesOutput out) {
+    this.out = out;
+  }
+
+  private static ThreadLocal tbOut = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new TypedBytesRecordOutput();
+    }
+  };
+
+  /**
+   * Get a thread-local typed bytes record input for the supplied
+   * {@link TypedBytesOutput}.
+   * 
+   * @param out typed bytes output object
+   * @return typed bytes record output corresponding to the supplied
+   *         {@link TypedBytesOutput}.
+   */
+  public static TypedBytesRecordOutput get(TypedBytesOutput out) {
+    TypedBytesRecordOutput bout = (TypedBytesRecordOutput) tbOut.get();
+    bout.setTypedBytesOutput(out);
+    return bout;
+  }
+
+  /**
+   * Get a thread-local typed bytes record output for the supplied
+   * {@link DataOutput}.
+   * 
+   * @param out data output object
+   * @return typed bytes record output corresponding to the supplied
+   *         {@link DataOutput}.
+   */
+  public static TypedBytesRecordOutput get(DataOutput out) {
+    return get(TypedBytesOutput.get(out));
+  }
+
+  /** Creates a new instance of TypedBytesRecordOutput. */
+  public TypedBytesRecordOutput(TypedBytesOutput out) {
+    this.out = out;
+  }
+
+  /** Creates a new instance of TypedBytesRecordOutput. */
+  public TypedBytesRecordOutput(DataOutput out) {
+    this(new TypedBytesOutput(out));
+  }
+
+  public void writeBool(boolean b, String tag) throws IOException {
+    out.writeBool(b);
+  }
+
+  public void writeBuffer(Buffer buf, String tag) throws IOException {
+    out.writeBytes(buf.get());
+  }
+
+  public void writeByte(byte b, String tag) throws IOException {
+    out.writeByte(b);
+  }
+
+  public void writeDouble(double d, String tag) throws IOException {
+    out.writeDouble(d);
+  }
+
+  public void writeFloat(float f, String tag) throws IOException {
+    out.writeFloat(f);
+  }
+
+  public void writeInt(int i, String tag) throws IOException {
+    out.writeInt(i);
+  }
+
+  public void writeLong(long l, String tag) throws IOException {
+    out.writeLong(l);
+  }
+
+  public void writeString(String s, String tag) throws IOException {
+    out.writeString(s);
+  }
+
+  public void startRecord(Record r, String tag) throws IOException {
+    out.writeListHeader();
+  }
+
+  public void startVector(ArrayList v, String tag) throws IOException {
+    out.writeVectorHeader(v.size());
+  }
+
+  public void startMap(TreeMap m, String tag) throws IOException {
+    out.writeMapHeader(m.size());
+  }
+
+  public void endRecord(Record r, String tag) throws IOException {
+    out.writeListFooter();
+  }
+
+  public void endVector(ArrayList v, String tag) throws IOException {}
+
+  public void endMap(TreeMap m, String tag) throws IOException {}
+
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordReader.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordReader.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordReader.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer;
+import org.apache.hadoop.hive.ql.exec.RecordReader;
+
+public class TypedBytesRecordReader implements RecordReader {
+
+  private DataInputStream din;
+  private TypedBytesWritableInput tbIn;
+
+  NonSyncDataOutputBuffer barrStr = new NonSyncDataOutputBuffer();
+  TypedBytesWritableOutput tbOut = new TypedBytesWritableOutput(barrStr);
+
+  ArrayList<Writable> row = new ArrayList<Writable>(0);
+
+  public void initialize(InputStream in, Configuration conf) throws IOException {
+    din = new DataInputStream(in);
+    tbIn = new TypedBytesWritableInput(din);
+  }
+
+  public Writable createRow() throws IOException {
+    BytesWritable retWrit = new BytesWritable();
+    return retWrit;
+  }
+
+  private Writable allocateWritable(Type type) {
+    switch (type) {
+    case BYTE:
+      return new ByteWritable();
+    case BOOL:
+      return new BooleanWritable();
+    case INT:
+      return new IntWritable();
+    case SHORT:
+      return new ShortWritable();
+    case LONG:
+      return new LongWritable();
+    case FLOAT:
+      return new FloatWritable();
+    case DOUBLE:
+      return new DoubleWritable();
+    case STRING:
+      return new Text();
+     default:
+       assert false; // not supported
+    }
+    return null;
+  }
+  
+  public int next(Writable data) throws IOException {
+    int pos = 0;
+    barrStr.reset();
+
+    while (true) {
+      Type type = tbIn.readTypeCode();
+      
+      // it was a empty stream
+      if (type == null)
+        return -1;
+      
+      if (type == Type.ENDOFRECORD) {
+        tbOut.writeEndOfRecord();
+        if (barrStr.getLength() > 0)
+          ((BytesWritable)data).set(barrStr.getData(), 0, barrStr.getLength());
+        return barrStr.getLength();
+      }
+    
+      if (pos >= row.size()) {
+        Writable wrt = allocateWritable(type);
+        assert pos == row.size();
+        row.add(wrt);
+      }
+     
+      switch (type) {
+        case BYTE: {
+          ByteWritable bw = (ByteWritable)row.get(pos);
+          tbIn.readByte(bw);
+          tbOut.writeByte(bw);
+          break;
+        }
+        case BOOL: {
+          BooleanWritable bw = (BooleanWritable)row.get(pos);
+          tbIn.readBoolean(bw);
+          tbOut.writeBoolean(bw);
+          break;
+        }
+        case INT: {
+          IntWritable iw = (IntWritable)row.get(pos);
+          tbIn.readInt(iw);
+          tbOut.writeInt(iw);
+          break;
+        }
+        case SHORT: {
+          ShortWritable sw = (ShortWritable)row.get(pos);
+          tbIn.readShort(sw);
+          tbOut.writeShort(sw);
+          break;
+        }        
+        case LONG: {
+          LongWritable lw = (LongWritable)row.get(pos);
+          tbIn.readLong(lw);
+          tbOut.writeLong(lw);
+          break;
+        }
+        case FLOAT: {
+          FloatWritable fw = (FloatWritable)row.get(pos);
+          tbIn.readFloat(fw);
+          tbOut.writeFloat(fw);
+          break;
+        }
+        case DOUBLE: {
+          DoubleWritable dw = (DoubleWritable)row.get(pos);
+          tbIn.readDouble(dw);
+          tbOut.writeDouble(dw);
+          break;
+        }
+        case STRING: {
+          Text txt = (Text)row.get(pos);
+          tbIn.readText(txt);
+          tbOut.writeText(txt);
+          break;
+        }
+        default:
+          assert false;  // should never come here
+      }
+    
+      pos++;
+    }
+  }
+  
+  public void close() throws IOException {
+    if (din != null)
+      din.close();
+  }
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordWriter.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordWriter.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordWriter.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.hive.ql.exec.RecordWriter;
+
+public class TypedBytesRecordWriter implements RecordWriter {
+
+  private OutputStream out;
+
+  public void initialize(OutputStream out, Configuration conf) throws IOException {
+    this.out = out;
+  }
+
+  public void write(Writable row) throws IOException {
+    BytesWritable brow = (BytesWritable)row;
+    out.write(brow.get(), 0, brow.getSize());
+  }
+
+  public void close() throws IOException {
+    out.flush();
+    out.close();
+  }
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritable.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritable.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritable.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+ * Writable for typed bytes.
+ */
+public class TypedBytesWritable extends BytesWritable {
+
+  /** Create a TypedBytesWritable. */
+  public TypedBytesWritable() {
+    super();
+  }
+
+  /** Create a TypedBytesWritable with a given byte array as initial value. */
+  public TypedBytesWritable(byte[] bytes) {
+    super(bytes);
+  }
+
+  /** Set the typed bytes from a given Java object. */
+  public void setValue(Object obj) {
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      TypedBytesOutput tbo = TypedBytesOutput.get(new DataOutputStream(baos));
+      tbo.write(obj);
+      byte[] bytes = baos.toByteArray();
+      set(bytes, 0, bytes.length);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Get the typed bytes as a Java object. */
+  public Object getValue() {
+    try {
+      ByteArrayInputStream bais = new ByteArrayInputStream(get());
+      TypedBytesInput tbi = TypedBytesInput.get(new DataInputStream(bais));
+      Object obj = tbi.read();
+      return obj;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Get the type code embedded in the first byte. */
+  public Type getType() {
+    byte[] bytes = get();
+    if (bytes == null || bytes.length == 0) {
+      return null;
+    }
+    for (Type type : Type.values()) {
+      if (type.code == (int) bytes[0]) {
+        return type;
+      }
+    }
+    return null;
+  }
+
+  /** Generate a suitable string representation. */
+  public String toString() {
+    return getValue().toString();
+  }
+
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableInput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableInput.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableInput.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableInput.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SortedMapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Provides functionality for reading typed bytes as Writable objects.
+ * 
+ * @see TypedBytesInput
+ */
+public class TypedBytesWritableInput implements Configurable {
+
+  private TypedBytesInput in;
+  private Configuration conf;
+
+  private TypedBytesWritableInput() {
+    conf = new Configuration();
+  }
+
+  private void setTypedBytesInput(TypedBytesInput in) {
+    this.in = in;
+  }
+
+  private static ThreadLocal tbIn = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new TypedBytesWritableInput();
+    }
+  };
+
+  /**
+   * Get a thread-local typed bytes writable input for the supplied
+   * {@link TypedBytesInput}.
+   * 
+   * @param in typed bytes input object
+   * @return typed bytes writable input corresponding to the supplied
+   *         {@link TypedBytesInput}.
+   */
+  public static TypedBytesWritableInput get(TypedBytesInput in) {
+    TypedBytesWritableInput bin = (TypedBytesWritableInput) tbIn.get();
+    bin.setTypedBytesInput(in);
+    return bin;
+  }
+
+  /**
+   * Get a thread-local typed bytes writable input for the supplied
+   * {@link DataInput}.
+   * 
+   * @param in data input object
+   * @return typed bytes writable input corresponding to the supplied
+   *         {@link DataInput}.
+   */
+  public static TypedBytesWritableInput get(DataInput in) {
+    return get(TypedBytesInput.get(in));
+  }
+
+  /** Creates a new instance of TypedBytesWritableInput. */
+  public TypedBytesWritableInput(TypedBytesInput in) {
+    this();
+    this.in = in;
+  }
+
+  /** Creates a new instance of TypedBytesWritableInput. */
+  public TypedBytesWritableInput(DataInput din) {
+    this(new TypedBytesInput(din));
+  }
+
+  public Writable read() throws IOException {
+    Type type = in.readType();
+    if (type == null) {
+      return null;
+    }
+    switch (type) {
+    case BYTES:
+      return readBytes();
+    case BYTE:
+      return readByte();
+    case BOOL:
+      return readBoolean();
+    case INT:
+      return readVInt();
+    case SHORT:
+      return readShort();
+    case LONG:
+      return readVLong();
+    case FLOAT:
+      return readFloat();
+    case DOUBLE:
+      return readDouble();
+    case STRING:
+      return readText();
+    case VECTOR:
+      return readArray();
+    case MAP:
+      return readMap();
+    case WRITABLE:
+      return readWritable();
+    case ENDOFRECORD:
+      return null;
+    default:
+      throw new RuntimeException("unknown type");
+    }
+  }
+
+  public Type readTypeCode() throws IOException {
+    return in.readType();
+  }
+  
+  public Class<? extends Writable> readType() throws IOException {
+    Type type = in.readType();
+    if (type == null) {
+      return null;
+    }
+    switch (type) {
+    case BYTES:
+      return BytesWritable.class;
+    case BYTE:
+      return ByteWritable.class;
+    case BOOL:
+      return BooleanWritable.class;
+    case INT:
+      return VIntWritable.class;
+    case LONG:
+      return VLongWritable.class;
+    case FLOAT:
+      return FloatWritable.class;
+    case DOUBLE:
+      return DoubleWritable.class;
+    case STRING:
+      return Text.class;
+    case VECTOR:
+      return ArrayWritable.class;
+    case MAP:
+      return MapWritable.class;
+    case WRITABLE:
+      return Writable.class;
+    case ENDOFRECORD:
+      return null;
+    default:
+      throw new RuntimeException("unknown type");
+    }
+  }
+
+  public BytesWritable readBytes(BytesWritable bw) throws IOException {
+    byte[] bytes = in.readBytes();
+    if (bw == null) {
+      bw = new BytesWritable(bytes);
+    } else {
+      bw.set(bytes, 0, bytes.length);
+    }
+    return bw;
+  }
+
+  public BytesWritable readBytes() throws IOException {
+    return readBytes(null);
+  }
+
+  public ByteWritable readByte(ByteWritable bw) throws IOException {
+    if (bw == null) {
+      bw = new ByteWritable();
+    }
+    bw.set(in.readByte());
+    return bw;
+  }
+
+  public ByteWritable readByte() throws IOException {
+    return readByte(null);
+  }
+
+  public BooleanWritable readBoolean(BooleanWritable bw) throws IOException {
+    if (bw == null) {
+      bw = new BooleanWritable();
+    }
+    bw.set(in.readBool());
+    return bw;
+  }
+
+  public BooleanWritable readBoolean() throws IOException {
+    return readBoolean(null);
+  }
+
+  public IntWritable readInt(IntWritable iw) throws IOException {
+    if (iw == null) {
+      iw = new IntWritable();
+    }
+    iw.set(in.readInt());
+    return iw;
+  }
+
+  public IntWritable readInt() throws IOException {
+    return readInt(null);
+  }
+
+  public ShortWritable readShort(ShortWritable sw) throws IOException {
+    if (sw == null) {
+      sw = new ShortWritable();
+    }
+    sw.set(in.readShort());
+    return sw;
+  }
+
+  public ShortWritable readShort() throws IOException {
+    return readShort(null);
+  }
+  
+  public VIntWritable readVInt(VIntWritable iw) throws IOException {
+    if (iw == null) {
+      iw = new VIntWritable();
+    }
+    iw.set(in.readInt());
+    return iw;
+  }
+
+  public VIntWritable readVInt() throws IOException {
+    return readVInt(null);
+  }
+
+  public LongWritable readLong(LongWritable lw) throws IOException {
+    if (lw == null) {
+      lw = new LongWritable();
+    }
+    lw.set(in.readLong());
+    return lw;
+  }
+
+  public LongWritable readLong() throws IOException {
+    return readLong(null);
+  }
+
+  public VLongWritable readVLong(VLongWritable lw) throws IOException {
+    if (lw == null) {
+      lw = new VLongWritable();
+    }
+    lw.set(in.readLong());
+    return lw;
+  }
+
+  public VLongWritable readVLong() throws IOException {
+    return readVLong(null);
+  }
+
+  public FloatWritable readFloat(FloatWritable fw) throws IOException {
+    if (fw == null) {
+      fw = new FloatWritable();
+    }
+    fw.set(in.readFloat());
+    return fw;
+  }
+
+  public FloatWritable readFloat() throws IOException {
+    return readFloat(null);
+  }
+
+  public DoubleWritable readDouble(DoubleWritable dw) throws IOException {
+    if (dw == null) {
+      dw = new DoubleWritable();
+    }
+    dw.set(in.readDouble());
+    return dw;
+  }
+
+  public DoubleWritable readDouble() throws IOException {
+    return readDouble(null);
+  }
+
+  public Text readText(Text t) throws IOException {
+    if (t == null) {
+      t = new Text();
+    }
+    t.set(in.readString());
+    return t;
+  }
+
+  public Text readText() throws IOException {
+    return readText(null);
+  }
+
+  public ArrayWritable readArray(ArrayWritable aw) throws IOException {
+    if (aw == null) {
+      aw = new ArrayWritable(TypedBytesWritable.class);
+    } else if (!aw.getValueClass().equals(TypedBytesWritable.class)) {
+      throw new RuntimeException("value class has to be TypedBytesWritable");
+    }
+    int length = in.readVectorHeader();
+    Writable[] writables = new Writable[length];
+    for (int i = 0; i < length; i++) {
+      writables[i] = new TypedBytesWritable(in.readRaw());
+    }
+    aw.set(writables);
+    return aw;
+  }
+
+  public ArrayWritable readArray() throws IOException {
+    return readArray(null);
+  }
+
+  public MapWritable readMap(MapWritable mw) throws IOException {
+    if (mw == null) {
+      mw = new MapWritable();
+    }
+    int length = in.readMapHeader();
+    for (int i = 0; i < length; i++) {
+      Writable key = read();
+      Writable value = read();
+      mw.put(key, value);
+    }
+    return mw;
+  }
+
+  public MapWritable readMap() throws IOException {
+    return readMap(null);
+  }
+
+  public SortedMapWritable readSortedMap(SortedMapWritable mw)
+    throws IOException {
+    if (mw == null) {
+      mw = new SortedMapWritable();
+    }
+    int length = in.readMapHeader();
+    for (int i = 0; i < length; i++) {
+      WritableComparable key = (WritableComparable) read();
+      Writable value = read();
+      mw.put(key, value);
+    }
+    return mw;
+  }
+
+  public SortedMapWritable readSortedMap() throws IOException {
+    return readSortedMap(null);
+  }
+  
+  public Writable readWritable(Writable writable) throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(in.readBytes());
+    DataInputStream dis = new DataInputStream(bais);
+    String className = WritableUtils.readString(dis);
+    if (writable == null) {
+      try {
+        Class<? extends Writable> cls = 
+          conf.getClassByName(className).asSubclass(Writable.class);
+        writable = (Writable) ReflectionUtils.newInstance(cls, conf);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+    } else if (!writable.getClass().getName().equals(className)) {
+      throw new IOException("wrong Writable class given");
+    }
+    writable.readFields(dis);
+    return writable;
+  }
+
+  public Writable readWritable() throws IOException {
+    return readWritable(null);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+  
+}

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableOutput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableOutput.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableOutput.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableOutput.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.util.typedbytes;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SortedMapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.record.Record;
+
+/**
+ * Provides functionality for writing Writable objects as typed bytes.
+ * 
+ * @see TypedBytesOutput
+ */
+public class TypedBytesWritableOutput {
+
+  private TypedBytesOutput out;
+
+  private TypedBytesWritableOutput() {}
+
+  private void setTypedBytesOutput(TypedBytesOutput out) {
+    this.out = out;
+  }
+
+  private static ThreadLocal tbOut = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new TypedBytesWritableOutput();
+    }
+  };
+
+  /**
+   * Get a thread-local typed bytes writable input for the supplied
+   * {@link TypedBytesOutput}.
+   * 
+   * @param out typed bytes output object
+   * @return typed bytes writable output corresponding to the supplied
+   *         {@link TypedBytesOutput}.
+   */
+  public static TypedBytesWritableOutput get(TypedBytesOutput out) {
+    TypedBytesWritableOutput bout = (TypedBytesWritableOutput) tbOut.get();
+    bout.setTypedBytesOutput(out);
+    return bout;
+  }
+
+  /**
+   * Get a thread-local typed bytes writable output for the supplied
+   * {@link DataOutput}.
+   * 
+   * @param out data output object
+   * @return typed bytes writable output corresponding to the supplied
+   *         {@link DataOutput}.
+   */
+  public static TypedBytesWritableOutput get(DataOutput out) {
+    return get(TypedBytesOutput.get(out));
+  }
+
+  /** Creates a new instance of TypedBytesWritableOutput. */
+  public TypedBytesWritableOutput(TypedBytesOutput out) {
+    this();
+    this.out = out;
+  }
+
+  /** Creates a new instance of TypedBytesWritableOutput. */
+  public TypedBytesWritableOutput(DataOutput dout) {
+    this(new TypedBytesOutput(dout));
+  }
+
+  public void write(Writable w) throws IOException {
+    if (w instanceof TypedBytesWritable) {
+      writeTypedBytes((TypedBytesWritable) w);
+    } else if (w instanceof BytesWritable) {
+      writeBytes((BytesWritable) w);
+    } else if (w instanceof ByteWritable) {
+      writeByte((ByteWritable) w);
+    } else if (w instanceof BooleanWritable) {
+      writeBoolean((BooleanWritable) w);
+    } else if (w instanceof IntWritable) {
+      writeInt((IntWritable) w);
+    } else if (w instanceof VIntWritable) {
+      writeVInt((VIntWritable) w);
+    } else if (w instanceof LongWritable) {
+      writeLong((LongWritable) w);
+    } else if (w instanceof VLongWritable) {
+      writeVLong((VLongWritable) w);
+    } else if (w instanceof FloatWritable) {
+      writeFloat((FloatWritable) w);
+    } else if (w instanceof DoubleWritable) {
+      writeDouble((DoubleWritable) w);
+    } else if (w instanceof Text) {
+      writeText((Text) w);
+    } else if (w instanceof ShortWritable) {
+      writeShort((ShortWritable)w);
+    } else if (w instanceof ArrayWritable) {
+      writeArray((ArrayWritable) w);
+    } else if (w instanceof MapWritable) {
+      writeMap((MapWritable) w);
+    } else if (w instanceof SortedMapWritable) {
+      writeSortedMap((SortedMapWritable) w);
+    } else if (w instanceof Record) {
+      writeRecord((Record) w);
+    } else {
+      writeWritable(w); // last resort
+    }
+  }
+
+  public void writeTypedBytes(TypedBytesWritable tbw) throws IOException {
+    out.writeRaw(tbw.get(), 0, tbw.getSize());
+  }
+
+  public void writeBytes(BytesWritable bw) throws IOException {
+    byte[] bytes = Arrays.copyOfRange(bw.get(), 0, bw.getSize());
+    out.writeBytes(bytes);
+  }
+
+  public void writeByte(ByteWritable bw) throws IOException {
+    out.writeByte(bw.get());
+  }
+
+  public void writeBoolean(BooleanWritable bw) throws IOException {
+    out.writeBool(bw.get());
+  }
+
+  public void writeInt(IntWritable iw) throws IOException {
+    out.writeInt(iw.get());
+  }
+
+  public void writeVInt(VIntWritable viw) throws IOException {
+    out.writeInt(viw.get());
+  }
+
+  public void writeLong(LongWritable lw) throws IOException {
+    out.writeLong(lw.get());
+  }
+
+  public void writeVLong(VLongWritable vlw) throws IOException {
+    out.writeLong(vlw.get());
+  }
+
+  public void writeFloat(FloatWritable fw) throws IOException {
+    out.writeFloat(fw.get());
+  }
+
+  public void writeDouble(DoubleWritable dw) throws IOException {
+    out.writeDouble(dw.get());
+  }
+
+  public void writeShort(ShortWritable sw) throws IOException {
+    out.writeShort(sw.get());
+  }
+
+  public void writeText(Text t) throws IOException {
+    out.writeString(t.toString());
+  }
+
+  public void writeArray(ArrayWritable aw) throws IOException {
+    Writable[] writables = aw.get();
+    out.writeVectorHeader(writables.length);
+    for (Writable writable : writables) {
+      write(writable);
+    }
+  }
+
+  public void writeMap(MapWritable mw) throws IOException {
+    out.writeMapHeader(mw.size());
+    for (Map.Entry<Writable, Writable> entry : mw.entrySet()) {
+      write(entry.getKey());
+      write(entry.getValue());
+    }
+  }
+
+  public void writeSortedMap(SortedMapWritable smw) throws IOException {
+    out.writeMapHeader(smw.size());
+    for (Map.Entry<WritableComparable, Writable> entry : smw.entrySet()) {
+      write(entry.getKey());
+      write(entry.getValue());
+    }
+  }
+
+  public void writeRecord(Record r) throws IOException {
+    r.serialize(TypedBytesRecordOutput.get(out));
+  }
+
+  public void writeWritable(Writable w) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    WritableUtils.writeString(dos, w.getClass().getName());
+    w.write(dos);
+    dos.close();
+    out.writeBytes(baos.toByteArray(), Type.WRITABLE.code);
+  }
+
+  public void writeEndOfRecord() throws IOException {
+    out.writeEndOfRecord();
+  }
+}

Added: hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q (added)
+++ hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q Tue Sep  1 21:36:28 2009
@@ -0,0 +1,29 @@
+add jar ../build/contrib/hive_contrib.jar;
+
+drop table dest1;
+CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM (
+  FROM src
+  SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' 
+  RECORDWRITER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordWriter'
+  USING '/bin/cat'
+  AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' 
+  RECORDREADER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue;
+
+FROM (
+  FROM src
+  SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+  RECORDWRITER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordWriter'
+  USING '/bin/cat'
+  AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+  RECORDREADER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue;
+
+SELECT dest1.* FROM dest1;
+
+drop table dest1;

Modified: hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out?rev=810252&r1=810251&r2=810252&view=diff
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out (original)
+++ hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out Tue Sep  1 21:36:28 2009
@@ -4,13 +4,14 @@
 FROM (
   FROM src
   SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' 
+  RECORDWRITER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordWriter'
   USING '/bin/cat'
   AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' 
-  RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+  RECORDREADER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordReader'
 ) tmap
 INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue
 ABSTRACT SYNTAX TREE:
-  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (. (TOK_TABLE_OR_COL src) key) (. (TOK_TABLE_OR_COL src) value)) (TOK_SERDE (TOK_SERDENAME 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe')) TOK_RECORDREADER '/bin/cat' (TOK_SERDE (TOK_SERDENAME 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe')) (TOK_RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader') (TOK_ALIASLIST tkey tvalue)))))) tmap)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB dest1)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL tkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL tvalue)))))
+  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (. (TOK_TABLE_OR_COL src) key) (. (TOK_TABLE_OR_COL src) value)) (TOK_SERDE (TOK_SERDENAME 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe')) (TOK_RECORDWRITER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordWriter') '/bin/cat' (TOK_SERDE (TOK_SERDENAME 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe')) (TOK_RECORDREADER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordReader') (TOK_ALIASLIST tkey tvalue)))))) tmap)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB dest1)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL tkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL tvalue)))))
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -65,10 +66,10 @@
           Move Operator
             files:
                 hdfs directory: true
-                destination: file:/data/users/njain/hive1/hive1/build/ql/tmp/1595378189/10000
+                destination: file:/data/users/njain/hive1/hive1/build/ql/tmp/2032306747/10000
           Map Reduce
             Alias -> Map Operator Tree:
-              file:/data/users/njain/hive1/hive1/build/ql/tmp/945395579/10002 
+              file:/data/users/njain/hive1/hive1/build/ql/tmp/90741617/10002 
                   Reduce Output Operator
                     sort order: 
                     Map-reduce partition columns:
@@ -105,16 +106,17 @@
 query: FROM (
   FROM src
   SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+  RECORDWRITER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordWriter'
   USING '/bin/cat'
   AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
-  RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+  RECORDREADER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordReader'
 ) tmap
 INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue
 Input: default/src
 Output: default/dest1
 query: SELECT dest1.* FROM dest1
 Input: default/dest1
-Output: file:/data/users/njain/hive1/hive1/build/ql/tmp/1097288414/10000
+Output: file:/data/users/njain/hive1/hive1/build/ql/tmp/1184607913/10000
 238	val_238
 86	val_86
 311	val_311

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java?rev=810252&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java Tue Sep  1 21:36:28 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+
+public interface RecordWriter {
+
+  public void initialize(OutputStream in, Configuration conf) throws IOException;
+  public void write(Writable row) throws IOException;
+  public void close() throws IOException;
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=810252&r1=810251&r2=810252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Tue Sep  1 21:36:28 2009
@@ -58,9 +58,6 @@
   transient private LongWritable deserialize_error_count = new LongWritable ();
   transient private LongWritable serialize_error_count = new LongWritable ();
 
-  transient DataOutputStream scriptOut;
-  transient DataInputStream scriptErr;
-  transient DataInputStream scriptIn;
   transient Thread outThread;
   transient Thread errThread;
   transient Process scriptPid;
@@ -70,6 +67,7 @@
   // Output from the script
   transient Deserializer scriptOutputDeserializer;
   transient volatile Throwable scriptError = null;
+  transient RecordWriter scriptOutWriter;
 
   /**
    * Timer to send periodic reports back to the tracker.
@@ -220,9 +218,12 @@
       env.put(safeEnvVarName(HiveConf.ConfVars.HIVEALIAS.varname), String.valueOf(alias));
       scriptPid = pb.start();       // Runtime.getRuntime().exec(wrappedCmdArgs);
 
-      scriptOut = new DataOutputStream(new BufferedOutputStream(scriptPid.getOutputStream()));
-      scriptIn = new DataInputStream(new BufferedInputStream(scriptPid.getInputStream()));
-      scriptErr = new DataInputStream(new BufferedInputStream(scriptPid.getErrorStream()));
+      DataOutputStream scriptOut = new DataOutputStream(new BufferedOutputStream(scriptPid.getOutputStream()));
+      DataInputStream  scriptIn = new DataInputStream(new BufferedInputStream(scriptPid.getInputStream()));
+      DataInputStream  scriptErr = new DataInputStream(new BufferedInputStream(scriptPid.getErrorStream()));
+
+      scriptOutWriter = conf.getInRecordWriterClass().newInstance();
+      scriptOutWriter.initialize(scriptOut, hconf);
       
       RecordReader scriptOutputReader = conf.getOutRecordReaderClass().newInstance();
       scriptOutputReader.initialize(scriptIn, hconf);
@@ -266,16 +267,15 @@
     }
   }
 
-  Text text = new Text();
   public void processOp(Object row, int tag) throws HiveException {
 
     if(scriptError != null) {
       throw new HiveException(scriptError);
     }
+
     try {
-      text = (Text) scriptInputSerializer.serialize(row, inputObjInspectors[tag]);
-      scriptOut.write(text.getBytes(), 0, text.getLength());
-      scriptOut.write(Utilities.newLineCode);
+      Writable res = scriptInputSerializer.serialize(row, inputObjInspectors[tag]);
+      scriptOutWriter.write(res);
     } catch (SerDeException e) {
       LOG.error("Error in serializing the row: " + e.getMessage());
       scriptError = e;
@@ -297,8 +297,7 @@
       }
       // everything ok. try normal shutdown
       try {
-        scriptOut.flush();
-        scriptOut.close();
+        scriptOutWriter.close();
         int exitVal = scriptPid.waitFor();
         if (exitVal != 0) {
           LOG.error("Script failed with code " + exitVal);
@@ -421,6 +420,7 @@
 
         while(true) {
           long bytes = in.next(row);
+
           if(bytes <= 0) {
             break;
           }