You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/24 20:23:22 UTC

svn commit: r807330 [1/3] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/ contrib/src/test/queries/clientpositive/ contrib/src/test/results/clientpositive/ eclipse-t...

Author: zshao
Date: Mon Aug 24 18:23:19 2009
New Revision: 807330

URL: http://svn.apache.org/viewvc?rev=807330&view=rev
Log:
HIVE-708. Add TypedBytes SerDe for transform. (Namit Jain via zshao)

Added:
    hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java
    hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q
    hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesOutput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesRecordInput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesRecordOutput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritable.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritableInput.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritableOutput.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/input38.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input38.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/eclipse-templates/.classpath
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
    hadoop/hive/trunk/ql/src/test/results/clientnegative/script_error.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input14.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input14_limit.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input17.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input18.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input20.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input33.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input34.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input35.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input36.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/nullscript.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_transform.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/regexp_extract.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr2.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/parse/input20.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/parse/input4.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/parse/input5.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hadoop/hive/trunk/serde/if/serde.thrift
    hadoop/hive/trunk/serde/src/gen-java/org/apache/hadoop/hive/serde/Constants.java
    hadoop/hive/trunk/serde/src/gen-php/serde_constants.php
    hadoop/hive/trunk/serde/src/gen-py/org_apache_hadoop_hive_serde/constants.py
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/InputByteBuffer.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/OutputByteBuffer.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug 24 18:23:19 2009
@@ -23,6 +23,8 @@
 
     HIVE-187. Preliminary ODBC Support. (Eric Hwang via rmurthy)
 
+    HIVE-708. Add TypedBytes SerDe for transform. (Namit Jain via zshao)
+
   IMPROVEMENTS
 
     HIVE-760. Add version info to META-INF/MANIFEST.MF.

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Aug 24 18:23:19 2009
@@ -133,6 +133,10 @@
     //Location of Hive run time structured log file
     HIVEHISTORYFILELOC("hive.querylog.location",  "/tmp/"+System.getProperty("user.name")),
     
+    // Default serde and record reader for user scripts
+    HIVESCRIPTSERDE("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+    HIVESCRIPTRECORDREADER("hive.script.recordreader", "org.apache.hadoop.hive.ql.exec.TextRecordReader"),
+    
     // HWI
     HIVEHWILISTENHOST("hive.hwi.listen.host","0.0.0.0"),
     HIVEHWILISTENPORT("hive.hwi.listen.port","9999"),

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Mon Aug 24 18:23:19 2009
@@ -339,4 +339,16 @@
   <description>Whether Hive Tranform/Map/Reduce Clause should automatically send progress information to TaskTracker to avoid the task getting killed because of inactivity.  Hive sends progress information when the script is outputting to stderr.  This option removes the need of periodically producing stderr messages, but users should be cautious because this may prevent infinite loops in the scripts to be killed by TaskTracker.  </description>
 </property>
 
+<property>
+  <name>hive.script.serde</name>
+  <value>org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe</value>
+  <description>The default serde for trasmitting input data to and reading output data from the user scripts. </description>
+</property>
+
+<property>
+  <name>hive.script.recordreader</name>
+  <value>org.apache.hadoop.hive.ql.exec.TextRecordReader</value>
+  <description>The default record reader for reading data from the user scripts. </description>
+</property>
+
 </configuration>

Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.serde2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.util.typedbytes.Type;
+import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableInput;
+import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableOutput;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer;
+import org.apache.hadoop.hive.ql.io.NonSyncDataInputBuffer;
+
+/**
+ * TypedBytesSerDe uses typed bytes to serialize/deserialize.
+ * 
+ * More info on the typedbytes stuff that Dumbo uses.
+ * http://issues.apache.org/jira/browse/HADOOP-1722 
+ * A fast python decoder for this, which is apparently 25% faster than the python version is available at
+ * http://github.com/klbostee/ctypedbytes/tree/master 
+ */
+public class TypedBytesSerDe implements SerDe {
+
+  public static final Log LOG = LogFactory.getLog(TypedBytesSerDe.class.getName());
+  
+  int numColumns;
+  StructObjectInspector rowOI;
+  ArrayList<Object> row;
+ 
+  BytesWritable serializeBytesWritable;
+  NonSyncDataOutputBuffer barrStr;
+  TypedBytesWritableOutput tbOut;
+  
+  NonSyncDataInputBuffer inBarrStr;
+  TypedBytesWritableInput tbIn;
+  
+  List<String>   columnNames;
+  List<TypeInfo> columnTypes;
+  
+  @Override
+  public void initialize(Configuration conf, Properties tbl)
+      throws SerDeException {
+
+    // We can get the table definition from tbl.
+    serializeBytesWritable = new BytesWritable();
+    barrStr = new NonSyncDataOutputBuffer();
+    tbOut = new TypedBytesWritableOutput(barrStr);
+    
+    inBarrStr = new NonSyncDataInputBuffer();
+    tbIn = new TypedBytesWritableInput(inBarrStr);
+    
+    // Read the configuration parameters
+    String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS);
+    String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
+
+    columnNames = Arrays.asList(columnNameProperty.split(","));
+    columnTypes = new ArrayList<TypeInfo>();
+    List<String> columnTypeProps = Arrays.asList(columnTypeProperty.split(","));
+
+    for (String colType : columnTypeProps) {
+      columnTypes.add(TypeInfoUtils
+                      .getTypeInfoFromTypeString(colType));
+    }
+
+    assert columnNames.size() == columnTypes.size();
+    numColumns = columnNames.size(); 
+    
+    // All columns have to be primitive.
+    for (int c = 0; c < numColumns; c++) {
+      if (columnTypes.get(c).getCategory() != Category.PRIMITIVE) {
+        throw new SerDeException(getClass().getName() 
+            + " only accepts primitive columns, but column[" + c 
+            + "] named " + columnNames.get(c) + " has category "
+            + columnTypes.get(c).getCategory());
+      }
+    }
+    
+    // Constructing the row ObjectInspector:
+    // The row consists of some string columns, each column will be a java 
+    // String object.
+    List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(columnNames.size());
+    for (int c = 0; c < numColumns; c++) {
+      columnOIs.add(TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(columnTypes.get(c)));
+    }
+    
+    // StandardStruct uses ArrayList to store the row. 
+    rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);
+    
+    // Constructing the row object, etc, which will be reused for all rows.
+    row = new ArrayList<Object>(numColumns);
+    for (int c = 0; c < numColumns; c++) {
+      row.add(null);
+    }
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return rowOI;
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return BytesWritable.class;
+  }
+  
+  @Override
+  public Object deserialize(Writable blob) throws SerDeException {
+
+    BytesWritable data = (BytesWritable)blob;
+    inBarrStr.reset(data.get(), 0, data.getSize()-1);   
+    
+    try {
+
+      for (int i=0; i<columnNames.size(); i++) {
+        row.set(i, deserializeField(tbIn, columnTypes.get(i), row.get(i)));
+      }
+
+      // The next byte should be the marker
+      assert tbIn.readTypeCode() == Type.ENDOFRECORD;
+      
+    } catch (IOException e) {
+      throw new SerDeException(e);
+    }
+    
+    return row;
+  }
+
+  static Object deserializeField(TypedBytesWritableInput in, TypeInfo type, Object reuse) throws IOException {
+
+    // read the type
+    in.readType();
+
+    switch (type.getCategory()) {
+      case PRIMITIVE: {
+        PrimitiveTypeInfo ptype = (PrimitiveTypeInfo)type;
+        switch (ptype.getPrimitiveCategory()) {
+
+          case VOID: {
+            return null;
+          }
+
+          case BOOLEAN: {
+            BooleanWritable r = reuse == null ? new BooleanWritable() : (BooleanWritable)reuse;
+            r = (BooleanWritable)in.readBoolean(r);
+            return r;
+          }
+          case BYTE: {
+            ByteWritable r = reuse == null ? new ByteWritable() : (ByteWritable)reuse;
+            r = (ByteWritable)in.readByte(r);
+            return r;
+          }
+          case SHORT: {
+            ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable)reuse;
+            r = (ShortWritable)in.readShort(r);
+            return r;
+          }
+          case INT: {
+            IntWritable r = reuse == null ? new IntWritable() : (IntWritable)reuse;
+            r = (IntWritable)in.readInt(r);
+            return r;
+          }
+          case LONG: {
+            LongWritable r = reuse == null ? new LongWritable() : (LongWritable)reuse;
+            r = (LongWritable)in.readLong(r);
+            return r;
+          }
+          case FLOAT: {
+            FloatWritable r = reuse == null ? new FloatWritable() : (FloatWritable)reuse;
+            r = (FloatWritable)in.readFloat(r);
+            return r;
+          }
+          case DOUBLE: {
+            DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable)reuse;
+            r = (DoubleWritable)in.readDouble(r);
+            return r;
+          }
+          case STRING: {
+            Text r = reuse == null ? new Text() : (Text)reuse;
+            r = (Text)in.readText(r);
+            return r;
+          }
+          default: {
+            throw new RuntimeException("Unrecognized type: " + ptype.getPrimitiveCategory());
+          }
+        }
+      }
+      // Currently, deserialization of complex types is not supported
+      case LIST: 
+      case MAP:
+      case STRUCT: 
+      default: {
+        throw new RuntimeException("Unsupported category: " + type.getCategory());
+      }
+    }
+  }
+
+
+  
+  @Override
+  public Writable serialize(Object obj, ObjectInspector objInspector)
+      throws SerDeException {
+    try {
+      barrStr.reset();
+      StructObjectInspector soi = (StructObjectInspector)objInspector;
+      List<? extends StructField> fields = soi.getAllStructFieldRefs();
+    
+      for (int i = 0; i < numColumns; i++) {
+        Object o = soi.getStructFieldData(obj, fields.get(i));
+        ObjectInspector oi = fields.get(i).getFieldObjectInspector(); 
+        serializeField(o, oi, row.get(i));
+      }
+    
+      // End of the record is part of the data
+      tbOut.writeEndOfRecord();
+      
+      serializeBytesWritable.set(barrStr.getData(), 0, barrStr.getLength());
+    } catch (IOException e) {
+      throw new SerDeException(e.getMessage());
+    }
+    return serializeBytesWritable;
+  }
+   
+  private byte[] tmpByteArr = new byte[1];
+  
+  private void serializeField(Object o, ObjectInspector oi, Object reuse) throws IOException {
+    switch (oi.getCategory()) {
+      case PRIMITIVE: {
+        PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi;
+        switch (poi.getPrimitiveCategory()) {
+          case VOID: {
+            return;
+          }
+          case BOOLEAN: {
+            BooleanObjectInspector boi = (BooleanObjectInspector)poi;
+            BooleanWritable r = reuse == null ? new BooleanWritable() : (BooleanWritable)reuse;
+            r.set(boi.get(o));
+            tbOut.write(r);
+            return;
+          }
+          case BYTE: {
+            ByteObjectInspector boi = (ByteObjectInspector)poi;
+            BytesWritable r = reuse == null ? new BytesWritable() : (BytesWritable)reuse;
+            tmpByteArr[0] = boi.get(o);
+            r.set(tmpByteArr, 0, 1);
+            tbOut.write(r);
+            return;
+          }
+          case SHORT: {
+            ShortObjectInspector spoi = (ShortObjectInspector)poi;
+            ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable)reuse;
+            r.set(spoi.get(o));
+            tbOut.write(r);
+            return;
+          }
+          case INT: {
+            IntObjectInspector ioi = (IntObjectInspector)poi;
+            IntWritable r = reuse == null ? new IntWritable() : (IntWritable)reuse;
+            r.set(ioi.get(o));
+            tbOut.write(r);
+            return;
+          }
+          case LONG: {
+            LongObjectInspector loi = (LongObjectInspector)poi;
+            LongWritable r = reuse == null ? new LongWritable() : (LongWritable)reuse;
+            r.set(loi.get(o));
+            tbOut.write(r);
+            return;
+          }
+          case FLOAT: {
+            FloatObjectInspector foi = (FloatObjectInspector)poi;
+            FloatWritable r = reuse == null ? new FloatWritable() : (FloatWritable)reuse;
+            r.set(foi.get(o));
+            tbOut.write(r);
+            return;
+          }
+          case DOUBLE: {
+            DoubleObjectInspector doi = (DoubleObjectInspector)poi;
+            DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable)reuse;
+            r.set(doi.get(o));
+            tbOut.write(r);
+            return;
+          }
+          case STRING: {
+            StringObjectInspector soi = (StringObjectInspector)poi;
+            Text t = soi.getPrimitiveWritableObject(o);
+            tbOut.write(t);
+            return;
+          }
+          default: {
+            throw new RuntimeException("Unrecognized type: " + poi.getPrimitiveCategory());
+          }
+        }
+      }
+      case LIST: 
+      case MAP:
+      case STRUCT: {
+        // For complex object, serialize to JSON format
+        String s = SerDeUtils.getJSONString(o, oi);
+        Text t = reuse == null ? new Text() : (Text)reuse;
+        
+        // convert to Text and write it
+        t.set(s);
+        tbOut.write(t);
+      }
+      default: {
+        throw new RuntimeException("Unrecognized type: " + oi.getCategory());
+      }
+    }
+  }
+}

Added: hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q (added)
+++ hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q Mon Aug 24 18:23:19 2009
@@ -0,0 +1,27 @@
+add jar ../build/contrib/hive_contrib.jar;
+
+drop table dest1;
+CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM (
+  FROM src
+  SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' 
+  USING '/bin/cat'
+  AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' 
+  RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue;
+
+FROM (
+  FROM src
+  SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+  USING '/bin/cat'
+  AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+  RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue;
+
+SELECT dest1.* FROM dest1;
+
+drop table dest1;

Added: hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out (added)
+++ hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out Mon Aug 24 18:23:19 2009
@@ -0,0 +1,618 @@
+query: drop table dest1
+query: CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE
+query: EXPLAIN
+FROM (
+  FROM src
+  SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' 
+  USING '/bin/cat'
+  AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' 
+  RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (. (TOK_TABLE_OR_COL src) key) (. (TOK_TABLE_OR_COL src) value)) (TOK_SERDE (TOK_SERDENAME 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe')) TOK_RECORDREADER '/bin/cat' (TOK_SERDE (TOK_SERDENAME 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe')) (TOK_RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader') (TOK_ALIASLIST tkey tvalue)))))) tmap)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB dest1)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL tkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL tvalue)))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-4 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-4
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        tmap:src 
+          TableScan
+            alias: src
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+              outputColumnNames: _col0, _col1
+              Transform Operator
+                command: /bin/cat
+                output info:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                Select Operator
+                  expressions:
+                        expr: tkey
+                        type: string
+                        expr: tvalue
+                        type: string
+                  outputColumnNames: _col0, _col1
+                  Select Operator
+                    expressions:
+                          expr: UDFToInteger(_col0)
+                          type: int
+                          expr: _col1
+                          type: string
+                    outputColumnNames: _col0, _col1
+                    File Output Operator
+                      compressed: false
+                      GlobalTableId: 1
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                          name: dest1
+
+  Stage: Stage-4
+    Conditional Operator
+      list of dependent Tasks:
+          Move Operator
+            files:
+                hdfs directory: true
+                destination: file:/data/users/njain/hive1/hive1/build/ql/tmp/1595378189/10000
+          Map Reduce
+            Alias -> Map Operator Tree:
+              file:/data/users/njain/hive1/hive1/build/ql/tmp/945395579/10002 
+                  Reduce Output Operator
+                    sort order: 
+                    Map-reduce partition columns:
+                          expr: rand()
+                          type: double
+                    tag: -1
+                    value expressions:
+                          expr: key
+                          type: int
+                          expr: value
+                          type: string
+            Reduce Operator Tree:
+              Extract
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 0
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      name: dest1
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: dest1
+
+
+query: FROM (
+  FROM src
+  SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+  USING '/bin/cat'
+  AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+  RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue
+Input: default/src
+Output: default/dest1
+query: SELECT dest1.* FROM dest1
+Input: default/dest1
+Output: file:/data/users/njain/hive1/hive1/build/ql/tmp/1097288414/10000
+238	val_238
+86	val_86
+311	val_311
+27	val_27
+165	val_165
+409	val_409
+255	val_255
+278	val_278
+98	val_98
+484	val_484
+265	val_265
+193	val_193
+401	val_401
+150	val_150
+273	val_273
+224	val_224
+369	val_369
+66	val_66
+128	val_128
+213	val_213
+146	val_146
+406	val_406
+429	val_429
+374	val_374
+152	val_152
+469	val_469
+145	val_145
+495	val_495
+37	val_37
+327	val_327
+281	val_281
+277	val_277
+209	val_209
+15	val_15
+82	val_82
+403	val_403
+166	val_166
+417	val_417
+430	val_430
+252	val_252
+292	val_292
+219	val_219
+287	val_287
+153	val_153
+193	val_193
+338	val_338
+446	val_446
+459	val_459
+394	val_394
+237	val_237
+482	val_482
+174	val_174
+413	val_413
+494	val_494
+207	val_207
+199	val_199
+466	val_466
+208	val_208
+174	val_174
+399	val_399
+396	val_396
+247	val_247
+417	val_417
+489	val_489
+162	val_162
+377	val_377
+397	val_397
+309	val_309
+365	val_365
+266	val_266
+439	val_439
+342	val_342
+367	val_367
+325	val_325
+167	val_167
+195	val_195
+475	val_475
+17	val_17
+113	val_113
+155	val_155
+203	val_203
+339	val_339
+0	val_0
+455	val_455
+128	val_128
+311	val_311
+316	val_316
+57	val_57
+302	val_302
+205	val_205
+149	val_149
+438	val_438
+345	val_345
+129	val_129
+170	val_170
+20	val_20
+489	val_489
+157	val_157
+378	val_378
+221	val_221
+92	val_92
+111	val_111
+47	val_47
+72	val_72
+4	val_4
+280	val_280
+35	val_35
+427	val_427
+277	val_277
+208	val_208
+356	val_356
+399	val_399
+169	val_169
+382	val_382
+498	val_498
+125	val_125
+386	val_386
+437	val_437
+469	val_469
+192	val_192
+286	val_286
+187	val_187
+176	val_176
+54	val_54
+459	val_459
+51	val_51
+138	val_138
+103	val_103
+239	val_239
+213	val_213
+216	val_216
+430	val_430
+278	val_278
+176	val_176
+289	val_289
+221	val_221
+65	val_65
+318	val_318
+332	val_332
+311	val_311
+275	val_275
+137	val_137
+241	val_241
+83	val_83
+333	val_333
+180	val_180
+284	val_284
+12	val_12
+230	val_230
+181	val_181
+67	val_67
+260	val_260
+404	val_404
+384	val_384
+489	val_489
+353	val_353
+373	val_373
+272	val_272
+138	val_138
+217	val_217
+84	val_84
+348	val_348
+466	val_466
+58	val_58
+8	val_8
+411	val_411
+230	val_230
+208	val_208
+348	val_348
+24	val_24
+463	val_463
+431	val_431
+179	val_179
+172	val_172
+42	val_42
+129	val_129
+158	val_158
+119	val_119
+496	val_496
+0	val_0
+322	val_322
+197	val_197
+468	val_468
+393	val_393
+454	val_454
+100	val_100
+298	val_298
+199	val_199
+191	val_191
+418	val_418
+96	val_96
+26	val_26
+165	val_165
+327	val_327
+230	val_230
+205	val_205
+120	val_120
+131	val_131
+51	val_51
+404	val_404
+43	val_43
+436	val_436
+156	val_156
+469	val_469
+468	val_468
+308	val_308
+95	val_95
+196	val_196
+288	val_288
+481	val_481
+457	val_457
+98	val_98
+282	val_282
+197	val_197
+187	val_187
+318	val_318
+318	val_318
+409	val_409
+470	val_470
+137	val_137
+369	val_369
+316	val_316
+169	val_169
+413	val_413
+85	val_85
+77	val_77
+0	val_0
+490	val_490
+87	val_87
+364	val_364
+179	val_179
+118	val_118
+134	val_134
+395	val_395
+282	val_282
+138	val_138
+238	val_238
+419	val_419
+15	val_15
+118	val_118
+72	val_72
+90	val_90
+307	val_307
+19	val_19
+435	val_435
+10	val_10
+277	val_277
+273	val_273
+306	val_306
+224	val_224
+309	val_309
+389	val_389
+327	val_327
+242	val_242
+369	val_369
+392	val_392
+272	val_272
+331	val_331
+401	val_401
+242	val_242
+452	val_452
+177	val_177
+226	val_226
+5	val_5
+497	val_497
+402	val_402
+396	val_396
+317	val_317
+395	val_395
+58	val_58
+35	val_35
+336	val_336
+95	val_95
+11	val_11
+168	val_168
+34	val_34
+229	val_229
+233	val_233
+143	val_143
+472	val_472
+322	val_322
+498	val_498
+160	val_160
+195	val_195
+42	val_42
+321	val_321
+430	val_430
+119	val_119
+489	val_489
+458	val_458
+78	val_78
+76	val_76
+41	val_41
+223	val_223
+492	val_492
+149	val_149
+449	val_449
+218	val_218
+228	val_228
+138	val_138
+453	val_453
+30	val_30
+209	val_209
+64	val_64
+468	val_468
+76	val_76
+74	val_74
+342	val_342
+69	val_69
+230	val_230
+33	val_33
+368	val_368
+103	val_103
+296	val_296
+113	val_113
+216	val_216
+367	val_367
+344	val_344
+167	val_167
+274	val_274
+219	val_219
+239	val_239
+485	val_485
+116	val_116
+223	val_223
+256	val_256
+263	val_263
+70	val_70
+487	val_487
+480	val_480
+401	val_401
+288	val_288
+191	val_191
+5	val_5
+244	val_244
+438	val_438
+128	val_128
+467	val_467
+432	val_432
+202	val_202
+316	val_316
+229	val_229
+469	val_469
+463	val_463
+280	val_280
+2	val_2
+35	val_35
+283	val_283
+331	val_331
+235	val_235
+80	val_80
+44	val_44
+193	val_193
+321	val_321
+335	val_335
+104	val_104
+466	val_466
+366	val_366
+175	val_175
+403	val_403
+483	val_483
+53	val_53
+105	val_105
+257	val_257
+406	val_406
+409	val_409
+190	val_190
+406	val_406
+401	val_401
+114	val_114
+258	val_258
+90	val_90
+203	val_203
+262	val_262
+348	val_348
+424	val_424
+12	val_12
+396	val_396
+201	val_201
+217	val_217
+164	val_164
+431	val_431
+454	val_454
+478	val_478
+298	val_298
+125	val_125
+431	val_431
+164	val_164
+424	val_424
+187	val_187
+382	val_382
+5	val_5
+70	val_70
+397	val_397
+480	val_480
+291	val_291
+24	val_24
+351	val_351
+255	val_255
+104	val_104
+70	val_70
+163	val_163
+438	val_438
+119	val_119
+414	val_414
+200	val_200
+491	val_491
+237	val_237
+439	val_439
+360	val_360
+248	val_248
+479	val_479
+305	val_305
+417	val_417
+199	val_199
+444	val_444
+120	val_120
+429	val_429
+169	val_169
+443	val_443
+323	val_323
+325	val_325
+277	val_277
+230	val_230
+478	val_478
+178	val_178
+468	val_468
+310	val_310
+317	val_317
+333	val_333
+493	val_493
+460	val_460
+207	val_207
+249	val_249
+265	val_265
+480	val_480
+83	val_83
+136	val_136
+353	val_353
+172	val_172
+214	val_214
+462	val_462
+233	val_233
+406	val_406
+133	val_133
+175	val_175
+189	val_189
+454	val_454
+375	val_375
+401	val_401
+421	val_421
+407	val_407
+384	val_384
+256	val_256
+26	val_26
+134	val_134
+67	val_67
+384	val_384
+379	val_379
+18	val_18
+462	val_462
+492	val_492
+100	val_100
+298	val_298
+9	val_9
+341	val_341
+498	val_498
+146	val_146
+458	val_458
+362	val_362
+186	val_186
+285	val_285
+348	val_348
+167	val_167
+18	val_18
+273	val_273
+183	val_183
+281	val_281
+344	val_344
+97	val_97
+469	val_469
+315	val_315
+84	val_84
+28	val_28
+37	val_37
+448	val_448
+152	val_152
+348	val_348
+307	val_307
+194	val_194
+414	val_414
+477	val_477
+222	val_222
+126	val_126
+90	val_90
+169	val_169
+403	val_403
+400	val_400
+200	val_200
+97	val_97
+query: drop table dest1

Modified: hadoop/hive/trunk/eclipse-templates/.classpath
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/eclipse-templates/.classpath?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/eclipse-templates/.classpath (original)
+++ hadoop/hive/trunk/eclipse-templates/.classpath Mon Aug 24 18:23:19 2009
@@ -25,6 +25,7 @@
 	<classpathentry exported="true" kind="lib" path="ql/lib/antlr-runtime-3.0.1.jar"/>
 	<classpathentry exported="true" kind="lib" path="testlibs/junit-3.8.1.jar"/>
 	<classpathentry kind="src" path="build/ql/gen-java"/>
+	<classpathentry kind="src" path="build/contrib/test/src"/>
 	<classpathentry kind="src" path="build/ql/test/src"/>
 	<classpathentry kind="src" path="cli/src/java"/>
 	<classpathentry kind="src" path="common/src/java"/>

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+
+public interface RecordReader {
+
+  public void initialize(InputStream in, Configuration conf) throws IOException;
+
+  public Writable createRow() throws IOException;
+
+  public int next(Writable row) throws IOException;
+  
+  public void close() throws IOException;
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Aug 24 18:23:19 2009
@@ -41,10 +41,11 @@
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 import org.apache.hadoop.util.StringUtils;
 
 
@@ -186,7 +187,7 @@
 
       scriptOutputDeserializer = conf.getScriptOutputInfo().getDeserializerClass().newInstance();
       scriptOutputDeserializer.initialize(hconf, conf.getScriptOutputInfo().getProperties());
-
+      
       scriptInputSerializer = (Serializer)conf.getScriptInputInfo().getDeserializerClass().newInstance();
       scriptInputSerializer.initialize(hconf, conf.getScriptInputInfo().getProperties());
 
@@ -222,9 +223,17 @@
       scriptOut = new DataOutputStream(new BufferedOutputStream(scriptPid.getOutputStream()));
       scriptIn = new DataInputStream(new BufferedInputStream(scriptPid.getInputStream()));
       scriptErr = new DataInputStream(new BufferedInputStream(scriptPid.getErrorStream()));
-      outThread = new StreamThread(scriptIn, new OutputStreamProcessor(
+      
+      RecordReader scriptOutputReader = conf.getOutRecordReaderClass().newInstance();
+      scriptOutputReader.initialize(scriptIn, hconf);
+      
+      outThread = new StreamThread(scriptOutputReader, new OutputStreamProcessor(
           scriptOutputDeserializer.getObjectInspector()), "OutputProcessor");
-      errThread = new StreamThread(scriptErr,
+      
+      RecordReader scriptErrReader = conf.getOutRecordReaderClass().newInstance();
+      scriptErrReader.initialize(scriptErr, hconf);
+      
+      errThread = new StreamThread(scriptErrReader,
                                    new ErrorStreamProcessor
                                    (HiveConf.getIntVar(hconf, HiveConf.ConfVars.SCRIPTERRORLIMIT)),
                                    "ErrorProcessor");
@@ -318,7 +327,7 @@
 
 
   interface StreamProcessor {
-    public void processLine(Text line) throws HiveException;
+    public void processLine(Writable line) throws HiveException;
     public void close() throws HiveException;
   }
 
@@ -329,7 +338,7 @@
     public OutputStreamProcessor(ObjectInspector rowInspector) {
       this.rowInspector = rowInspector;
     }
-    public void processLine(Text line) throws HiveException {
+    public void processLine(Writable line) throws HiveException {
       try {
         row = scriptOutputDeserializer.deserialize(line);
       } catch (SerDeException e) {
@@ -360,10 +369,16 @@
       lastReportTime = 0;
     }
     
-    public void processLine(Text line) throws HiveException {
+    public void processLine(Writable line) throws HiveException {
       
       String stringLine = line.toString();
+      int len = 0;
       
+      if (line instanceof Text) 
+        len = ((Text)line).getLength();
+      else if (line instanceof BytesWritable)
+        len = ((BytesWritable)line).getSize();
+          
       // Report progress for each stderr line, but no more frequently than once per minute.
       long now = System.currentTimeMillis();
       // reporter is a member variable of the Operator class.
@@ -375,11 +390,11 @@
       if((maxBytes < 0) || (bytesCopied < maxBytes)) {
         System.err.println(stringLine);
       }
-      if (bytesCopied < maxBytes && bytesCopied + line.getLength() >= maxBytes) {
+      if (bytesCopied < maxBytes && bytesCopied + len >= maxBytes) {
         System.err.println("Operator " + id + " " + getName()
             + ": exceeding stderr limit of " + maxBytes + " bytes, will truncate stderr messages.");
       }      
-      bytesCopied += line.getLength();
+      bytesCopied += len;
     }
     public void close() {
     }
@@ -389,11 +404,11 @@
 
   class StreamThread extends Thread {
 
-    InputStream in;
+    RecordReader in;
     StreamProcessor proc;
     String name;
 
-    StreamThread(InputStream in, StreamProcessor proc, String name) {
+    StreamThread(RecordReader in, StreamProcessor proc, String name) {
       this.in = in;
       this.proc = proc;
       this.name = name;
@@ -401,14 +416,11 @@
     }
 
     public void run() {
-      LineReader lineReader = null;
       try {
-        Text row = new Text();
-        lineReader = new LineReader((InputStream)in, hconf);
+        Writable row = in.createRow();
 
         while(true) {
-          row.clear();
-          long bytes = lineReader.readLine(row);
+          long bytes = in.next(row);
           if(bytes <= 0) {
             break;
           }
@@ -421,10 +433,9 @@
         LOG.warn(StringUtils.stringifyException(th));
       } finally {
         try {
-          if(lineReader != null) {
-            lineReader.close();
+          if (in != null) {
+            in.close();
           }
-          in.close();
           proc.close();
         } catch (Exception e) {
           LOG.warn(name + ": error in closing ..");

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+
+public class TextRecordReader implements RecordReader {
+
+  private LineReader  lineReader;
+  private InputStream in;
+  private Text        row;
+
+  public void initialize(InputStream in, Configuration conf) throws IOException {
+    lineReader = new LineReader(in, conf);
+    this.in = in;
+  }
+
+  public Writable createRow() throws IOException {
+    row = new Text();
+    return row;
+  }
+
+  public int next(Writable row) throws IOException {
+    if (lineReader == null)
+      return -1;
+
+    return lineReader.readLine((Text)row);
+  }
+  
+  public void close() throws IOException {
+    if (in != null)
+      in.close();
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.util.typedbytes.Type;
+import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableInput;
+import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableOutput;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer;
+
+public class TypedBytesRecordReader implements RecordReader {
+
+  private DataInputStream din;
+  private TypedBytesWritableInput tbIn;
+
+  NonSyncDataOutputBuffer barrStr = new NonSyncDataOutputBuffer();
+  TypedBytesWritableOutput tbOut = new TypedBytesWritableOutput(barrStr);
+
+  ArrayList<Writable> row = new ArrayList<Writable>(0);
+
+  public void initialize(InputStream in, Configuration conf) throws IOException {
+    din = new DataInputStream(in);
+    tbIn = new TypedBytesWritableInput(din);
+  }
+
+  public Writable createRow() throws IOException {
+    BytesWritable retWrit = new BytesWritable();
+    return retWrit;
+  }
+
+  private Writable allocateWritable(Type type) {
+    switch (type) {
+    case BYTE:
+      return new ByteWritable();
+    case BOOL:
+      return new BooleanWritable();
+    case INT:
+      return new IntWritable();
+    case SHORT:
+      return new ShortWritable();
+    case LONG:
+      return new LongWritable();
+    case FLOAT:
+      return new FloatWritable();
+    case DOUBLE:
+      return new DoubleWritable();
+    case STRING:
+      return new Text();
+     default:
+       assert false; // not supported
+    }
+    return null;
+  }
+  
+  public int next(Writable data) throws IOException {
+    int pos = 0;
+    barrStr.reset();
+
+    while (true) {
+      Type type = tbIn.readTypeCode();
+      
+      // it was a empty stream
+      if (type == null)
+        return -1;
+      
+      if (type == Type.ENDOFRECORD) {
+        tbOut.writeEndOfRecord();
+        if (barrStr.getLength() > 0)
+          ((BytesWritable)data).set(barrStr.getData(), 0, barrStr.getLength());
+        return barrStr.getLength();
+      }
+    
+      if (pos >= row.size()) {
+        Writable wrt = allocateWritable(type);
+        assert pos == row.size();
+        row.add(wrt);
+      }
+     
+      switch (type) {
+        case BYTE: {
+          ByteWritable bw = (ByteWritable)row.get(pos);
+          tbIn.readByte(bw);
+          tbOut.writeByte(bw);
+          break;
+        }
+        case BOOL: {
+          BooleanWritable bw = (BooleanWritable)row.get(pos);
+          tbIn.readBoolean(bw);
+          tbOut.writeBoolean(bw);
+          break;
+        }
+        case INT: {
+          IntWritable iw = (IntWritable)row.get(pos);
+          tbIn.readInt(iw);
+          tbOut.writeInt(iw);
+          break;
+        }
+        case SHORT: {
+          ShortWritable sw = (ShortWritable)row.get(pos);
+          tbIn.readShort(sw);
+          tbOut.writeShort(sw);
+          break;
+        }        
+        case LONG: {
+          LongWritable lw = (LongWritable)row.get(pos);
+          tbIn.readLong(lw);
+          tbOut.writeLong(lw);
+          break;
+        }
+        case FLOAT: {
+          FloatWritable fw = (FloatWritable)row.get(pos);
+          tbIn.readFloat(fw);
+          tbOut.writeFloat(fw);
+          break;
+        }
+        case DOUBLE: {
+          DoubleWritable dw = (DoubleWritable)row.get(pos);
+          tbIn.readDouble(dw);
+          tbOut.writeDouble(dw);
+          break;
+        }
+        case STRING: {
+          Text txt = (Text)row.get(pos);
+          tbIn.readText(txt);
+          tbOut.writeText(txt);
+          break;
+        }
+        default:
+          assert false;  // should never come here
+      }
+    
+      pos++;
+    }
+  }
+  
+  public void close() throws IOException {
+    if (din != null)
+      din.close();
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Mon Aug 24 18:23:19 2009
@@ -137,6 +137,7 @@
 TOK_HINTARGLIST;
 TOK_USERSCRIPTCOLNAMES;
 TOK_USERSCRIPTCOLSCHEMA;
+TOK_RECORDREADER;
 }
 
 
@@ -371,6 +372,14 @@
 @after { msgs.pop(); }
     : serdeFormat -> ^(TOK_SERDE serdeFormat)
     | serdePropertiesFormat -> ^(TOK_SERDE serdePropertiesFormat)
+    |   -> ^(TOK_SERDE)
+    ;
+
+recordReader
+@init { msgs.push("record reader specification"); }
+@after { msgs.pop(); }
+    : KW_RECORDREADER StringLiteral -> ^(TOK_RECORDREADER StringLiteral)
+    |   -> ^(TOK_RECORDREADER)
     ;
 
 serdeFormat
@@ -730,9 +739,11 @@
     ( KW_SELECT KW_TRANSFORM LPAREN selectExpressionList RPAREN
       | KW_MAP    selectExpressionList
       | KW_REDUCE selectExpressionList )
-    inSerde=serde? KW_USING StringLiteral 
-    ( KW_AS ((LPAREN (aliasList | columnNameTypeList) RPAREN) | (aliasList | columnNameTypeList)) outSerde=serde?)?
-    -> ^(TOK_TRANSFORM selectExpressionList $inSerde? StringLiteral aliasList? columnNameTypeList? $outSerde?)
+    inSerde=serde 
+    KW_USING StringLiteral 
+    ( KW_AS ((LPAREN (aliasList | columnNameTypeList) RPAREN) | (aliasList | columnNameTypeList)))? 
+    outSerde=serde outRec=recordReader
+    -> ^(TOK_TRANSFORM selectExpressionList $inSerde StringLiteral $outSerde $outRec aliasList? columnNameTypeList?)
     ;
     
 selectExpression
@@ -1307,6 +1318,7 @@
 KW_CONTINUE: 'CONTINUE';
 KW_CURSOR: 'CURSOR';
 KW_TRIGGER: 'TRIGGER';
+KW_RECORDREADER: 'RECORDREADER';
 
 
 // Operators

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Aug 24 18:23:19 2009
@@ -51,6 +51,7 @@
 import org.apache.hadoop.hive.ql.exec.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RecordReader;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -132,6 +133,9 @@
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.ql.exec.TextRecordReader;
+import org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader;
 
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -1114,18 +1118,18 @@
     return cmd;
   }
 
-  private tableDesc getTableDescFromSerDe(ASTNode child, String cols, boolean defaultCols) throws SemanticException {
+  private tableDesc getTableDescFromSerDe(ASTNode child, String cols, String colTypes, boolean defaultCols) throws SemanticException {
     if (child.getType() == HiveParser.TOK_SERDENAME) {
       String serdeName = unescapeSQLString(child.getChild(0).getText());
       Class<? extends Deserializer> serdeClass = null;
       
       try {
-        serdeClass = (Class<? extends Deserializer>)Class.forName(serdeName);
+        serdeClass = (Class<? extends Deserializer>)Class.forName(serdeName, true, JavaUtils.getClassLoader());
       } catch (ClassNotFoundException e) {
         throw new SemanticException(e);
       }
       
-      tableDesc tblDesc = PlanUtils.getTableDesc(serdeClass, Integer.toString(Utilities.tabCode), cols, defaultCols);
+      tableDesc tblDesc = PlanUtils.getTableDesc(serdeClass, Integer.toString(Utilities.tabCode), cols, colTypes, defaultCols, true);
       // copy all the properties
       if (child.getChildCount() == 2) {
         ASTNode prop = (ASTNode)((ASTNode)child.getChild(1)).getChild(0);
@@ -1138,7 +1142,7 @@
       return tblDesc;
     }
     else if (child.getType() == HiveParser.TOK_SERDEPROPS) {
-      tableDesc tblDesc = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols, defaultCols);
+      tableDesc tblDesc = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols, colTypes, defaultCols);
       int numChildRowFormat = child.getChildCount();
       for (int numC = 0; numC < numChildRowFormat; numC++)
       {
@@ -1173,112 +1177,136 @@
     // should never come here
     return null;
   }
-
+  
   @SuppressWarnings("nls")
   private Operator genScriptPlan(ASTNode trfm, QB qb,
       Operator input) throws SemanticException {
     // If there is no "AS" clause, the output schema will be "key,value"
     ArrayList<ColumnInfo> outputCols = new ArrayList<ColumnInfo>();
-    boolean defaultOutputColList = true;
-    int     inputSerDeChildNum = -1, outputSerDeChildNum = -1;
-    int     outputColumnNamesPos = -1, outputColumnSchemaPos = -1;
-    int     execPos = 1;
-
+    int     inputSerDeNum  = 1;
+    int     outputSerDeNum = 3, outputRecordReaderNum = 4;
+    int     outputColsNum = 5;
+    boolean outputColNames = false, outputColSchemas = false;
+    int     execPos = 2;
+    boolean defaultOutputCols = false;
+    
     // Go over all the children
-    for (int pos = 0; pos < trfm.getChildCount(); pos++) {
-      ASTNode child = (ASTNode)trfm.getChild(pos);
-      if (child.getType() == HiveParser.TOK_ALIASLIST) {
-        defaultOutputColList = false;
-        outputColumnNamesPos = pos;
-        break;
-      }
-      else if (child.getType() == HiveParser.TOK_TABCOLLIST) {
-        defaultOutputColList = false;
-        outputColumnSchemaPos = pos;
-        break;
-      }
+    if (trfm.getChildCount() > outputColsNum) {
+      ASTNode outCols = (ASTNode)trfm.getChild(outputColsNum);
+      if (outCols.getType() == HiveParser.TOK_ALIASLIST) 
+        outputColNames = true;
+      else if (outCols.getType() == HiveParser.TOK_TABCOLLIST) 
+        outputColSchemas = true;
     }
 
-    // input serde specified
-    if ((trfm.getChildCount() >  1) && 
-        (trfm.getChild(1).getType() == HiveParser.TOK_SERDE)) {
-      inputSerDeChildNum  = 1;
-      execPos++;
-    }
-
-    // output serde specified
-    int checkChildNum = -1;
-    if (outputColumnNamesPos >= 0)
-      checkChildNum = outputColumnNamesPos + 1;
-    else if (outputColumnSchemaPos >= 0)
-      checkChildNum = outputColumnSchemaPos + 1;
-    
-    if (checkChildNum >= 0) {
-      if ((trfm.getChildCount() > (checkChildNum))
-          && (trfm.getChild(checkChildNum).getType() == HiveParser.TOK_SERDE))
-        outputSerDeChildNum  = checkChildNum;
-    }
-    
     // If column type is not specified, use a string
-    if (defaultOutputColList) {
+    if (!outputColNames && !outputColSchemas) {
       outputCols.add(new ColumnInfo("key", TypeInfoFactory.stringTypeInfo, null, false));
       outputCols.add(new ColumnInfo("value", TypeInfoFactory.stringTypeInfo, null, false));
+      defaultOutputCols = true;
     } 
-    else if (outputColumnNamesPos >= 0) {
-      ASTNode collist = (ASTNode) trfm.getChild(outputColumnNamesPos);
-      int ccount = collist.getChildCount();
-      for (int i=0; i < ccount; ++i) {
-        outputCols.add(new ColumnInfo(unescapeIdentifier(((ASTNode)collist.getChild(i)).getText()), TypeInfoFactory.stringTypeInfo, null, false));
-      }
-    }
     else {
-      assert outputColumnSchemaPos >= 0;
-      ASTNode collist = (ASTNode) trfm.getChild(outputColumnSchemaPos);
+      ASTNode collist = (ASTNode) trfm.getChild(outputColsNum);
       int ccount = collist.getChildCount();
-      for (int i=0; i < ccount; ++i) {
-        ASTNode child = (ASTNode) collist.getChild(i);
-        assert child.getType() == HiveParser.TOK_TABCOL;  
-        outputCols.add(new ColumnInfo(unescapeIdentifier(((ASTNode)child.getChild(0)).getText()), 
-                                      TypeInfoUtils.getTypeInfoFromTypeString(DDLSemanticAnalyzer.getTypeName(((ASTNode)child.getChild(1)).getType())), null, false));
+      
+      if (outputColNames) {
+        for (int i=0; i < ccount; ++i) {
+          outputCols.add(new ColumnInfo(unescapeIdentifier(((ASTNode)collist.getChild(i)).getText()), TypeInfoFactory.stringTypeInfo, null, false));
+        }
+      }
+      else {
+        for (int i=0; i < ccount; ++i) {
+          ASTNode child = (ASTNode) collist.getChild(i);
+          assert child.getType() == HiveParser.TOK_TABCOL;  
+          outputCols.add(new ColumnInfo(unescapeIdentifier(((ASTNode)child.getChild(0)).getText()), 
+                                        TypeInfoUtils.getTypeInfoFromTypeString(DDLSemanticAnalyzer.getTypeName(((ASTNode)child.getChild(1)).getType())), null, false));
+        }
       }
     }
-
+    
     RowResolver out_rwsch = new RowResolver();
     StringBuilder columns = new StringBuilder();
+    StringBuilder columnTypes = new StringBuilder();
+    
     for (int i = 0; i < outputCols.size(); ++i) {
       if (i != 0) {
         columns.append(",");
+        columnTypes.append(",");
       }
+      
       columns.append(outputCols.get(i).getInternalName());
+      columnTypes.append(outputCols.get(i).getType().getTypeName());
+      
       out_rwsch.put(
         qb.getParseInfo().getAlias(),
         outputCols.get(i).getInternalName(),
         outputCols.get(i));
     }
 
+    StringBuilder inpColumns = new StringBuilder();
+    StringBuilder inpColumnTypes = new StringBuilder();
+    Vector<ColumnInfo> inputSchema = opParseCtx.get(input).getRR().getColumnInfos();
+    for (int i = 0; i < inputSchema.size(); ++i) {
+      if (i != 0) {
+        inpColumns.append(",");
+        inpColumnTypes.append(",");
+      }
+      
+      inpColumns.append(inputSchema.get(i).getInternalName());
+      inpColumnTypes.append(inputSchema.get(i).getType().getTypeName());        
+    }
+    
     tableDesc outInfo;
     tableDesc inInfo;
+    String    defaultSerdeName =  conf.getVar(HiveConf.ConfVars.HIVESCRIPTSERDE);
+    Class<? extends Deserializer> serde;
 
-    if (inputSerDeChildNum < 0)
-      inInfo = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), "");
-    else 
-      inInfo = getTableDescFromSerDe((ASTNode)(((ASTNode)trfm.getChild(inputSerDeChildNum))).getChild(0), "", false);
+    try {
+      serde = (Class<? extends Deserializer>)Class.forName(defaultSerdeName, true, JavaUtils.getClassLoader());
+    } catch (ClassNotFoundException e) {
+      throw new SemanticException(e);
+    }
 
-    if (outputSerDeChildNum < 0)
-      outInfo = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), columns.toString(), defaultOutputColList);
+    // Input and Output Serdes
+    if (trfm.getChild(inputSerDeNum).getChildCount() > 0)
+      inInfo = getTableDescFromSerDe((ASTNode)(((ASTNode)trfm.getChild(inputSerDeNum))).getChild(0), inpColumns.toString(), inpColumnTypes.toString(), false);
     else 
-      outInfo = getTableDescFromSerDe((ASTNode)(((ASTNode)trfm.getChild(outputSerDeChildNum))).getChild(0), columns.toString(), defaultOutputColList);
+      inInfo = PlanUtils.getTableDesc(serde, Integer.toString(Utilities.tabCode), inpColumns.toString(), inpColumnTypes.toString(), false, true);
 
+    if (trfm.getChild(inputSerDeNum).getChildCount() > 0)
+      outInfo = getTableDescFromSerDe((ASTNode)(((ASTNode)trfm.getChild(outputSerDeNum))).getChild(0), columns.toString(), columnTypes.toString(), false);
+    // This is for backward compatibility. If the user did not specify the output column list, we assume that there are 2 columns: key and value.
+    // However, if the script outputs: col1, col2, col3 seperated by TAB, the requirement is: key is col and value is (col2 TAB col3)
+    else
+      outInfo = PlanUtils.getTableDesc(serde, Integer.toString(Utilities.tabCode), columns.toString(), columnTypes.toString(), defaultOutputCols);
+
+    // Output record readers
+    Class <? extends RecordReader> outRecordReader = getRecordReader((ASTNode)trfm.getChild(outputRecordReaderNum));
+    
     Operator output = putOpInsertMap(OperatorFactory
             .getAndMakeChild(
-                new scriptDesc(
-                               getFixedCmd(stripQuotes(trfm.getChild(execPos).getText())),
-                      outInfo, inInfo),
+                new scriptDesc(getFixedCmd(stripQuotes(trfm.getChild(execPos).getText())), 
+                    inInfo, outInfo, outRecordReader),
                 new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch);
 
     return output;
   }
 
+  private Class<? extends RecordReader> getRecordReader(ASTNode node) throws SemanticException {
+    String name;
+
+    if (node.getChildCount() == 0) 
+      name = conf.getVar(HiveConf.ConfVars.HIVESCRIPTRECORDREADER);
+    else 
+      name = unescapeSQLString(node.getChild(0).getText());
+    
+    try {
+      return (Class<? extends RecordReader>)Class.forName(name, true, JavaUtils.getClassLoader());
+    } catch (ClassNotFoundException e) {
+      throw new SemanticException(e);
+    }
+  }
+  
   /**
    * This function is a wrapper of parseInfo.getGroupByForClause which automatically
    * translates SELECT DISTINCT a,b,c to SELECT a,b,c GROUP BY a,b,c.

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Mon Aug 24 18:23:19 2009
@@ -106,6 +106,13 @@
   public static tableDesc getTableDesc(Class<? extends Deserializer> serdeClass,
                                        String separatorCode, String columns, String columnTypes,
       boolean lastColumnTakesRestOfTheLine) {
+    return getTableDesc(serdeClass, separatorCode, columns, columnTypes, lastColumnTakesRestOfTheLine, false);
+  }
+
+  public static tableDesc getTableDesc(Class<? extends Deserializer> serdeClass,
+                                       String separatorCode, String columns, String columnTypes,
+                                       boolean lastColumnTakesRestOfTheLine, boolean useJSONForLazy) {
+
     Properties properties = Utilities.makeProperties(
       Constants.SERIALIZATION_FORMAT, separatorCode,
       Constants.LIST_COLUMNS, columns);
@@ -117,6 +124,16 @@
           Constants.SERIALIZATION_LAST_COLUMN_TAKES_REST,
           "true");
     }
+
+    // It is not a very clean way, and should be modified later - due to compatiblity reasons,
+    // user sees the results as json for custom scripts and has no way for specifying that.
+    // Right now, it is hard-coded in the code
+    if (useJSONForLazy)
+      properties.setProperty(
+          Constants.SERIALIZATION_USE_JSON_OBJECTS,
+          "true");
+      
+      
     return new tableDesc(
       serdeClass,
       TextInputFormat.class,

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java Mon Aug 24 18:23:19 2009
@@ -20,6 +20,8 @@
 
 import java.io.Serializable;
 
+import org.apache.hadoop.hive.ql.exec.RecordReader;
+
 @explain(displayName="Transform Operator")
 public class scriptDesc implements Serializable {
   private static final long serialVersionUID = 1L;
@@ -28,16 +30,19 @@
   private tableDesc scriptOutputInfo;
   // Describe how to serialize data out to user script
   private tableDesc scriptInputInfo;
+  private Class<? extends RecordReader> outRecordReaderClass;
 
   public scriptDesc() { }
   public scriptDesc(
     final String scriptCmd,
+    final tableDesc scriptInputInfo,
     final tableDesc scriptOutputInfo,
-    final tableDesc scriptInputInfo) {
-
+    final Class<? extends RecordReader> outRecordReaderClass) {
+    
     this.scriptCmd = scriptCmd;
-    this.scriptOutputInfo = scriptOutputInfo;
     this.scriptInputInfo = scriptInputInfo;
+    this.scriptOutputInfo = scriptOutputInfo;
+    this.outRecordReaderClass = outRecordReaderClass;
   }
   
   @explain(displayName="command")
@@ -61,4 +66,17 @@
   public void setScriptInputInfo(tableDesc scriptInputInfo) {
     this.scriptInputInfo = scriptInputInfo;
   }
+  /**
+   * @return the outRecordReaderClass
+   */
+  public Class<? extends RecordReader> getOutRecordReaderClass() {
+    return outRecordReaderClass;
+  }
+  /**
+   * @param outRecordReaderClass the outRecordReaderClass to set
+   */
+  public void setOutRecordReaderClass(
+      Class<? extends RecordReader> outRecordReaderClass) {
+    this.outRecordReaderClass = outRecordReaderClass;
+  }
 }

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util.typedbytes;
+
+/**
+ * The possible type codes.
+ */
+public enum Type {
+
+  // codes for supported types (< 50):
+  BYTES(0),
+  BYTE(1),
+  BOOL(2),
+  INT(3),
+  LONG(4),
+  FLOAT(5),
+  DOUBLE(6),
+  STRING(7),
+  VECTOR(8),
+  LIST(9),
+  MAP(10),
+  SHORT(11),
+  
+  // application-specific codes (50-200):
+  WRITABLE(50),
+  
+  ENDOFRECORD(100),
+  
+  // low-level codes (> 200):
+  MARKER(255);
+
+  final int code;
+
+  Type(int code) {
+    this.code = code;
+  }
+}
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util.typedbytes;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.record.Buffer;
+
+/**
+ * Provides functionality for reading typed bytes.
+ */
+public class TypedBytesInput {
+
+  private DataInput in;
+
+  private TypedBytesInput() {}
+
+  private void setDataInput(DataInput in) {
+    this.in = in;
+  }
+
+  private static ThreadLocal tbIn = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new TypedBytesInput();
+    }
+  };
+
+  /**
+   * Get a thread-local typed bytes input for the supplied {@link DataInput}.
+   * @param in data input object
+   * @return typed bytes input corresponding to the supplied {@link DataInput}.
+   */
+  public static TypedBytesInput get(DataInput in) {
+    TypedBytesInput bin = (TypedBytesInput) tbIn.get();
+    bin.setDataInput(in);
+    return bin;
+  }
+
+  /** Creates a new instance of TypedBytesInput. */
+  public TypedBytesInput(DataInput in) {
+    this.in = in;
+  }
+
+  /**
+   * Reads a typed bytes sequence and converts it to a Java object. The first 
+   * byte is interpreted as a type code, and then the right number of 
+   * subsequent bytes are read depending on the obtained type.
+   * @return the obtained object or null when the end of the file is reached
+   * @throws IOException
+   */
+  public Object read() throws IOException {
+    int code = 1;
+    try {
+      code = in.readUnsignedByte();
+    } catch (EOFException eof) {
+      return null;
+    }
+    if (code == Type.BYTES.code) {
+      return new Buffer(readBytes());
+    } else if (code == Type.BYTE.code) {
+      return readByte();
+    } else if (code == Type.BOOL.code) {
+      return readBool();
+    } else if (code == Type.INT.code) {
+      return readInt();
+    } else if (code == Type.SHORT.code) {
+      return readShort();
+    } else if (code == Type.LONG.code) {
+      return readLong();
+    } else if (code == Type.FLOAT.code) {
+      return readFloat();
+    } else if (code == Type.DOUBLE.code) {
+      return readDouble();
+    } else if (code == Type.STRING.code) {
+      return readString();
+    } else if (code == Type.VECTOR.code) {
+      return readVector();
+    } else if (code == Type.LIST.code) {
+      return readList();
+    } else if (code == Type.MAP.code) {
+      return readMap();
+    } else if (code == Type.MARKER.code) {
+      return null;
+    } else if (50 <= code && code <= 200) { // application-specific typecodes
+      return new Buffer(readBytes());
+    } else {
+      throw new RuntimeException("unknown type");
+    }
+  }
+
+  /**
+   * Reads a typed bytes sequence. The first byte is interpreted as a type code,
+   * and then the right number of subsequent bytes are read depending on the
+   * obtained type.
+   * 
+   * @return the obtained typed bytes sequence or null when the end of the file
+   *         is reached
+   * @throws IOException
+   */
+  public byte[] readRaw() throws IOException {
+    int code = -1;
+    try {
+      code = in.readUnsignedByte();
+    } catch (EOFException eof) {
+      return null;
+    }
+    if (code == Type.BYTES.code) {
+      return readRawBytes();
+    } else if (code == Type.BYTE.code) {
+      return readRawByte();
+    } else if (code == Type.BOOL.code) {
+      return readRawBool();
+    } else if (code == Type.INT.code) {
+      return readRawInt();
+    } else if (code == Type.LONG.code) {
+      return readRawLong();
+    } else if (code == Type.FLOAT.code) {
+      return readRawFloat();
+    } else if (code == Type.DOUBLE.code) {
+      return readRawDouble();
+    } else if (code == Type.STRING.code) {
+      return readRawString();
+    } else if (code == Type.VECTOR.code) {
+      return readRawVector();
+    } else if (code == Type.LIST.code) {
+      return readRawList();
+    } else if (code == Type.MAP.code) {
+      return readRawMap();
+    } else if (code == Type.MARKER.code) {
+      return null;
+    } else if (50 <= code && code <= 200) { // application-specific typecodes
+      return readRawBytes();
+    } else {
+      throw new RuntimeException("unknown type");
+    }
+  }
+
+  /**
+   * Reads a type byte and returns the corresponding {@link Type}.
+   * @return the obtained Type or null when the end of the file is reached
+   * @throws IOException
+   */
+  public Type readType() throws IOException {
+    int code = -1;
+    try {
+      code = in.readUnsignedByte();
+    } catch (EOFException eof) {
+      return null;
+    }
+    for (Type type : Type.values()) {
+      if (type.code == code) {
+        return type;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Skips a type byte.
+   * @return true iff the end of the file was not reached
+   * @throws IOException
+   */
+  public boolean skipType() throws IOException {
+    try {
+      in.readByte();
+      return true;
+    } catch (EOFException eof) {
+      return false;
+    }
+  }
+
+  /**
+   * Reads the bytes following a <code>Type.BYTES</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readBytes() throws IOException {
+    int length = in.readInt();
+    byte[] bytes = new byte[length];
+    in.readFully(bytes);
+    return bytes;
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.BYTES</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawBytes() throws IOException {
+    int length = in.readInt();
+    byte[] bytes = new byte[5 + length];
+    bytes[0] = (byte) Type.BYTES.code;
+    bytes[1] = (byte) (0xff & (length >> 24));
+    bytes[2] = (byte) (0xff & (length >> 16));
+    bytes[3] = (byte) (0xff & (length >> 8));
+    bytes[4] = (byte) (0xff & length);
+    in.readFully(bytes, 5, length);
+    return bytes;
+  }
+
+  /**
+   * Reads the byte following a <code>Type.BYTE</code> code.
+   * @return the obtained byte
+   * @throws IOException
+   */
+  public byte readByte() throws IOException {
+    return in.readByte();
+  }
+
+  /**
+   * Reads the raw byte following a <code>Type.BYTE</code> code.
+   * @return the obtained byte
+   * @throws IOException
+   */
+  public byte[] readRawByte() throws IOException {
+    byte[] bytes = new byte[2];
+    bytes[0] = (byte) Type.BYTE.code;
+    in.readFully(bytes, 1, 1);
+    return bytes;
+  }
+
+  /**
+   * Reads the boolean following a <code>Type.BOOL</code> code.
+   * @return the obtained boolean
+   * @throws IOException
+   */
+  public boolean readBool() throws IOException {
+    return in.readBoolean();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.BOOL</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawBool() throws IOException {
+    byte[] bytes = new byte[2];
+    bytes[0] = (byte) Type.BOOL.code;
+    in.readFully(bytes, 1, 1);
+    return bytes;
+  }
+
+  /**
+   * Reads the integer following a <code>Type.INT</code> code.
+   * @return the obtained integer
+   * @throws IOException
+   */
+  public int readInt() throws IOException {
+    return in.readInt();
+  }
+
+  /**
+   * Reads the short following a <code>Type.SHORT</code> code.
+   * @return the obtained short
+   * @throws IOException
+   */
+  public short readShort() throws IOException {
+    return in.readShort();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.INT</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawInt() throws IOException {
+    byte[] bytes = new byte[5];
+    bytes[0] = (byte) Type.INT.code;
+    in.readFully(bytes, 1, 4);
+    return bytes;
+  }
+
+  /**
+   * Reads the long following a <code>Type.LONG</code> code.
+   * @return the obtained long
+   * @throws IOException
+   */
+  public long readLong() throws IOException {
+    return in.readLong();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.LONG</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawLong() throws IOException {
+    byte[] bytes = new byte[9];
+    bytes[0] = (byte) Type.LONG.code;
+    in.readFully(bytes, 1, 8);
+    return bytes;
+  }
+
+  /**
+   * Reads the float following a <code>Type.FLOAT</code> code.
+   * @return the obtained float
+   * @throws IOException
+   */
+  public float readFloat() throws IOException {
+    return in.readFloat();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.FLOAT</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawFloat() throws IOException {
+    byte[] bytes = new byte[5];
+    bytes[0] = (byte) Type.FLOAT.code;
+    in.readFully(bytes, 1, 4);
+    return bytes;
+  }
+
+  /**
+   * Reads the double following a <code>Type.DOUBLE</code> code.
+   * @return the obtained double
+   * @throws IOException
+   */
+  public double readDouble() throws IOException {
+    return in.readDouble();
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.DOUBLE</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawDouble() throws IOException {
+    byte[] bytes = new byte[9];
+    bytes[0] = (byte) Type.DOUBLE.code;
+    in.readFully(bytes, 1, 8);
+    return bytes;
+  }
+
+  /**
+   * Reads the string following a <code>Type.STRING</code> code.
+   * @return the obtained string
+   * @throws IOException
+   */
+  public String readString() throws IOException {
+    return WritableUtils.readString(in);
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.STRING</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawString() throws IOException {
+    int length = in.readInt();
+    byte[] bytes = new byte[5 + length];
+    bytes[0] = (byte) Type.STRING.code;
+    bytes[1] = (byte) (0xff & (length >> 24));
+    bytes[2] = (byte) (0xff & (length >> 16));
+    bytes[3] = (byte) (0xff & (length >> 8));
+    bytes[4] = (byte) (0xff & length);
+    in.readFully(bytes, 5, length);
+    return bytes;
+  }
+
+  /**
+   * Reads the vector following a <code>Type.VECTOR</code> code.
+   * @return the obtained vector
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public ArrayList readVector() throws IOException {
+    int length = readVectorHeader();
+    ArrayList result = new ArrayList(length);
+    for (int i = 0; i < length; i++) {
+      result.add(read());
+    }
+    return result;
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.VECTOR</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawVector() throws IOException {
+    Buffer buffer = new Buffer();
+    int length = readVectorHeader();
+    buffer.append(new byte[] {
+      (byte) Type.VECTOR.code,
+      (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)),
+      (byte) (0xff & (length >> 8)), (byte) (0xff & length)
+    });
+    for (int i = 0; i < length; i++) {
+      buffer.append(readRaw());
+    }
+    return buffer.get();
+  }
+
+  /**
+   * Reads the header following a <code>Type.VECTOR</code> code.
+   * @return the number of elements in the vector
+   * @throws IOException
+   */
+  public int readVectorHeader() throws IOException {
+    return in.readInt();
+  }
+
+  /**
+   * Reads the list following a <code>Type.LIST</code> code.
+   * @return the obtained list
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public List readList() throws IOException {
+    List list = new ArrayList();
+    Object obj = read();
+    while (obj != null) {
+      list.add(obj);
+      obj = read();
+    }
+    return list;
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.LIST</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawList() throws IOException {
+    Buffer buffer = new Buffer(new byte[] { (byte) Type.LIST.code });
+    byte[] bytes = readRaw();
+    while (bytes != null) {
+      buffer.append(bytes);
+      bytes = readRaw();
+    }
+    buffer.append(new byte[] { (byte) Type.MARKER.code });
+    return buffer.get();
+  }
+
+  /**
+   * Reads the map following a <code>Type.MAP</code> code.
+   * @return the obtained map
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public TreeMap readMap() throws IOException {
+    int length = readMapHeader();
+    TreeMap result = new TreeMap();
+    for (int i = 0; i < length; i++) {
+      Object key = read();
+      Object value = read();
+      result.put(key, value);
+    }
+    return result;
+  }
+
+  /**
+   * Reads the raw bytes following a <code>Type.MAP</code> code.
+   * @return the obtained bytes sequence
+   * @throws IOException
+   */
+  public byte[] readRawMap() throws IOException {
+    Buffer buffer = new Buffer();
+    int length = readMapHeader();
+    buffer.append(new byte[] {
+      (byte) Type.MAP.code,
+      (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)),
+      (byte) (0xff & (length >> 8)), (byte) (0xff & length)
+    });
+    for (int i = 0; i < length; i++) {
+      buffer.append(readRaw());
+      buffer.append(readRaw());
+    }
+    return buffer.get();
+  }
+
+  /**
+   * Reads the header following a <code>Type.MAP</code> code.
+   * @return the number of key-value pairs in the map
+   * @throws IOException
+   */
+  public int readMapHeader() throws IOException {
+    return in.readInt();
+  }
+
+}