You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/24 20:23:22 UTC
svn commit: r807330 [1/3] - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
contrib/src/java/org/apache/hadoop/hive/contrib/serde2/
contrib/src/test/queries/clientpositive/
contrib/src/test/results/clientpositive/ eclipse-t...
Author: zshao
Date: Mon Aug 24 18:23:19 2009
New Revision: 807330
URL: http://svn.apache.org/viewvc?rev=807330&view=rev
Log:
HIVE-708. Add TypedBytes SerDe for transform. (Namit Jain via zshao)
Added:
hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java
hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q
hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesOutput.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesRecordInput.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesRecordOutput.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritable.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritableInput.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesWritableOutput.java
hadoop/hive/trunk/ql/src/test/queries/clientpositive/input38.q
hadoop/hive/trunk/ql/src/test/results/clientpositive/input38.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/eclipse-templates/.classpath
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
hadoop/hive/trunk/ql/src/test/results/clientnegative/script_error.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input14.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input14_limit.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input17.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input18.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input20.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input33.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input34.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input35.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input36.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input5.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce7.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/mapreduce8.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/nullscript.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_transform.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/regexp_extract.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr2.q.out
hadoop/hive/trunk/ql/src/test/results/compiler/parse/input20.q.out
hadoop/hive/trunk/ql/src/test/results/compiler/parse/input4.q.out
hadoop/hive/trunk/ql/src/test/results/compiler/parse/input5.q.out
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
hadoop/hive/trunk/serde/if/serde.thrift
hadoop/hive/trunk/serde/src/gen-java/org/apache/hadoop/hive/serde/Constants.java
hadoop/hive/trunk/serde/src/gen-php/serde_constants.php
hadoop/hive/trunk/serde/src/gen-py/org_apache_hadoop_hive_serde/constants.py
hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/InputByteBuffer.java
hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/OutputByteBuffer.java
hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug 24 18:23:19 2009
@@ -23,6 +23,8 @@
HIVE-187. Preliminary ODBC Support. (Eric Hwang via rmurthy)
+ HIVE-708. Add TypedBytes SerDe for transform. (Namit Jain via zshao)
+
IMPROVEMENTS
HIVE-760. Add version info to META-INF/MANIFEST.MF.
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Aug 24 18:23:19 2009
@@ -133,6 +133,10 @@
//Location of Hive run time structured log file
HIVEHISTORYFILELOC("hive.querylog.location", "/tmp/"+System.getProperty("user.name")),
+ // Default serde and record reader for user scripts
+ HIVESCRIPTSERDE("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+ HIVESCRIPTRECORDREADER("hive.script.recordreader", "org.apache.hadoop.hive.ql.exec.TextRecordReader"),
+
// HWI
HIVEHWILISTENHOST("hive.hwi.listen.host","0.0.0.0"),
HIVEHWILISTENPORT("hive.hwi.listen.port","9999"),
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Mon Aug 24 18:23:19 2009
@@ -339,4 +339,16 @@
<description>Whether Hive Tranform/Map/Reduce Clause should automatically send progress information to TaskTracker to avoid the task getting killed because of inactivity. Hive sends progress information when the script is outputting to stderr. This option removes the need of periodically producing stderr messages, but users should be cautious because this may prevent infinite loops in the scripts to be killed by TaskTracker. </description>
</property>
+<property>
+ <name>hive.script.serde</name>
+ <value>org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe</value>
+ <description>The default serde for trasmitting input data to and reading output data from the user scripts. </description>
+</property>
+
+<property>
+ <name>hive.script.recordreader</name>
+ <value>org.apache.hadoop.hive.ql.exec.TextRecordReader</value>
+ <description>The default record reader for reading data from the user scripts. </description>
+</property>
+
</configuration>
Added: hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java (added)
+++ hadoop/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.contrib.serde2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.util.typedbytes.Type;
+import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableInput;
+import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableOutput;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer;
+import org.apache.hadoop.hive.ql.io.NonSyncDataInputBuffer;
+
+/**
+ * TypedBytesSerDe uses typed bytes to serialize/deserialize.
+ *
+ * More info on the typedbytes stuff that Dumbo uses.
+ * http://issues.apache.org/jira/browse/HADOOP-1722
+ * A fast python decoder for this, which is apparently 25% faster than the python version is available at
+ * http://github.com/klbostee/ctypedbytes/tree/master
+ */
+public class TypedBytesSerDe implements SerDe {
+
+ public static final Log LOG = LogFactory.getLog(TypedBytesSerDe.class.getName());
+
+ int numColumns;
+ StructObjectInspector rowOI;
+ ArrayList<Object> row;
+
+ BytesWritable serializeBytesWritable;
+ NonSyncDataOutputBuffer barrStr;
+ TypedBytesWritableOutput tbOut;
+
+ NonSyncDataInputBuffer inBarrStr;
+ TypedBytesWritableInput tbIn;
+
+ List<String> columnNames;
+ List<TypeInfo> columnTypes;
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl)
+ throws SerDeException {
+
+ // We can get the table definition from tbl.
+ serializeBytesWritable = new BytesWritable();
+ barrStr = new NonSyncDataOutputBuffer();
+ tbOut = new TypedBytesWritableOutput(barrStr);
+
+ inBarrStr = new NonSyncDataInputBuffer();
+ tbIn = new TypedBytesWritableInput(inBarrStr);
+
+ // Read the configuration parameters
+ String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS);
+ String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
+
+ columnNames = Arrays.asList(columnNameProperty.split(","));
+ columnTypes = new ArrayList<TypeInfo>();
+ List<String> columnTypeProps = Arrays.asList(columnTypeProperty.split(","));
+
+ for (String colType : columnTypeProps) {
+ columnTypes.add(TypeInfoUtils
+ .getTypeInfoFromTypeString(colType));
+ }
+
+ assert columnNames.size() == columnTypes.size();
+ numColumns = columnNames.size();
+
+ // All columns have to be primitive.
+ for (int c = 0; c < numColumns; c++) {
+ if (columnTypes.get(c).getCategory() != Category.PRIMITIVE) {
+ throw new SerDeException(getClass().getName()
+ + " only accepts primitive columns, but column[" + c
+ + "] named " + columnNames.get(c) + " has category "
+ + columnTypes.get(c).getCategory());
+ }
+ }
+
+ // Constructing the row ObjectInspector:
+ // The row consists of some string columns, each column will be a java
+ // String object.
+ List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(columnNames.size());
+ for (int c = 0; c < numColumns; c++) {
+ columnOIs.add(TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(columnTypes.get(c)));
+ }
+
+ // StandardStruct uses ArrayList to store the row.
+ rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);
+
+ // Constructing the row object, etc, which will be reused for all rows.
+ row = new ArrayList<Object>(numColumns);
+ for (int c = 0; c < numColumns; c++) {
+ row.add(null);
+ }
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return rowOI;
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return BytesWritable.class;
+ }
+
+ @Override
+ public Object deserialize(Writable blob) throws SerDeException {
+
+ BytesWritable data = (BytesWritable)blob;
+ inBarrStr.reset(data.get(), 0, data.getSize()-1);
+
+ try {
+
+ for (int i=0; i<columnNames.size(); i++) {
+ row.set(i, deserializeField(tbIn, columnTypes.get(i), row.get(i)));
+ }
+
+ // The next byte should be the marker
+ assert tbIn.readTypeCode() == Type.ENDOFRECORD;
+
+ } catch (IOException e) {
+ throw new SerDeException(e);
+ }
+
+ return row;
+ }
+
+ static Object deserializeField(TypedBytesWritableInput in, TypeInfo type, Object reuse) throws IOException {
+
+ // read the type
+ in.readType();
+
+ switch (type.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveTypeInfo ptype = (PrimitiveTypeInfo)type;
+ switch (ptype.getPrimitiveCategory()) {
+
+ case VOID: {
+ return null;
+ }
+
+ case BOOLEAN: {
+ BooleanWritable r = reuse == null ? new BooleanWritable() : (BooleanWritable)reuse;
+ r = (BooleanWritable)in.readBoolean(r);
+ return r;
+ }
+ case BYTE: {
+ ByteWritable r = reuse == null ? new ByteWritable() : (ByteWritable)reuse;
+ r = (ByteWritable)in.readByte(r);
+ return r;
+ }
+ case SHORT: {
+ ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable)reuse;
+ r = (ShortWritable)in.readShort(r);
+ return r;
+ }
+ case INT: {
+ IntWritable r = reuse == null ? new IntWritable() : (IntWritable)reuse;
+ r = (IntWritable)in.readInt(r);
+ return r;
+ }
+ case LONG: {
+ LongWritable r = reuse == null ? new LongWritable() : (LongWritable)reuse;
+ r = (LongWritable)in.readLong(r);
+ return r;
+ }
+ case FLOAT: {
+ FloatWritable r = reuse == null ? new FloatWritable() : (FloatWritable)reuse;
+ r = (FloatWritable)in.readFloat(r);
+ return r;
+ }
+ case DOUBLE: {
+ DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable)reuse;
+ r = (DoubleWritable)in.readDouble(r);
+ return r;
+ }
+ case STRING: {
+ Text r = reuse == null ? new Text() : (Text)reuse;
+ r = (Text)in.readText(r);
+ return r;
+ }
+ default: {
+ throw new RuntimeException("Unrecognized type: " + ptype.getPrimitiveCategory());
+ }
+ }
+ }
+ // Currently, deserialization of complex types is not supported
+ case LIST:
+ case MAP:
+ case STRUCT:
+ default: {
+ throw new RuntimeException("Unsupported category: " + type.getCategory());
+ }
+ }
+ }
+
+
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector)
+ throws SerDeException {
+ try {
+ barrStr.reset();
+ StructObjectInspector soi = (StructObjectInspector)objInspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+
+ for (int i = 0; i < numColumns; i++) {
+ Object o = soi.getStructFieldData(obj, fields.get(i));
+ ObjectInspector oi = fields.get(i).getFieldObjectInspector();
+ serializeField(o, oi, row.get(i));
+ }
+
+ // End of the record is part of the data
+ tbOut.writeEndOfRecord();
+
+ serializeBytesWritable.set(barrStr.getData(), 0, barrStr.getLength());
+ } catch (IOException e) {
+ throw new SerDeException(e.getMessage());
+ }
+ return serializeBytesWritable;
+ }
+
+ private byte[] tmpByteArr = new byte[1];
+
+ private void serializeField(Object o, ObjectInspector oi, Object reuse) throws IOException {
+ switch (oi.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi;
+ switch (poi.getPrimitiveCategory()) {
+ case VOID: {
+ return;
+ }
+ case BOOLEAN: {
+ BooleanObjectInspector boi = (BooleanObjectInspector)poi;
+ BooleanWritable r = reuse == null ? new BooleanWritable() : (BooleanWritable)reuse;
+ r.set(boi.get(o));
+ tbOut.write(r);
+ return;
+ }
+ case BYTE: {
+ ByteObjectInspector boi = (ByteObjectInspector)poi;
+ BytesWritable r = reuse == null ? new BytesWritable() : (BytesWritable)reuse;
+ tmpByteArr[0] = boi.get(o);
+ r.set(tmpByteArr, 0, 1);
+ tbOut.write(r);
+ return;
+ }
+ case SHORT: {
+ ShortObjectInspector spoi = (ShortObjectInspector)poi;
+ ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable)reuse;
+ r.set(spoi.get(o));
+ tbOut.write(r);
+ return;
+ }
+ case INT: {
+ IntObjectInspector ioi = (IntObjectInspector)poi;
+ IntWritable r = reuse == null ? new IntWritable() : (IntWritable)reuse;
+ r.set(ioi.get(o));
+ tbOut.write(r);
+ return;
+ }
+ case LONG: {
+ LongObjectInspector loi = (LongObjectInspector)poi;
+ LongWritable r = reuse == null ? new LongWritable() : (LongWritable)reuse;
+ r.set(loi.get(o));
+ tbOut.write(r);
+ return;
+ }
+ case FLOAT: {
+ FloatObjectInspector foi = (FloatObjectInspector)poi;
+ FloatWritable r = reuse == null ? new FloatWritable() : (FloatWritable)reuse;
+ r.set(foi.get(o));
+ tbOut.write(r);
+ return;
+ }
+ case DOUBLE: {
+ DoubleObjectInspector doi = (DoubleObjectInspector)poi;
+ DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable)reuse;
+ r.set(doi.get(o));
+ tbOut.write(r);
+ return;
+ }
+ case STRING: {
+ StringObjectInspector soi = (StringObjectInspector)poi;
+ Text t = soi.getPrimitiveWritableObject(o);
+ tbOut.write(t);
+ return;
+ }
+ default: {
+ throw new RuntimeException("Unrecognized type: " + poi.getPrimitiveCategory());
+ }
+ }
+ }
+ case LIST:
+ case MAP:
+ case STRUCT: {
+ // For complex object, serialize to JSON format
+ String s = SerDeUtils.getJSONString(o, oi);
+ Text t = reuse == null ? new Text() : (Text)reuse;
+
+ // convert to Text and write it
+ t.set(s);
+ tbOut.write(t);
+ }
+ default: {
+ throw new RuntimeException("Unrecognized type: " + oi.getCategory());
+ }
+ }
+ }
+}
Added: hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q (added)
+++ hadoop/hive/trunk/contrib/src/test/queries/clientpositive/serde_typedbytes.q Mon Aug 24 18:23:19 2009
@@ -0,0 +1,27 @@
+add jar ../build/contrib/hive_contrib.jar;
+
+drop table dest1;
+CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM (
+ FROM src
+ SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+ USING '/bin/cat'
+ AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+ RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue;
+
+FROM (
+ FROM src
+ SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+ USING '/bin/cat'
+ AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+ RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue;
+
+SELECT dest1.* FROM dest1;
+
+drop table dest1;
Added: hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out (added)
+++ hadoop/hive/trunk/contrib/src/test/results/clientpositive/serde_typedbytes.q.out Mon Aug 24 18:23:19 2009
@@ -0,0 +1,618 @@
+query: drop table dest1
+query: CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE
+query: EXPLAIN
+FROM (
+ FROM src
+ SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+ USING '/bin/cat'
+ AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+ RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (. (TOK_TABLE_OR_COL src) key) (. (TOK_TABLE_OR_COL src) value)) (TOK_SERDE (TOK_SERDENAME 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe')) TOK_RECORDREADER '/bin/cat' (TOK_SERDE (TOK_SERDENAME 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe')) (TOK_RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader') (TOK_ALIASLIST tkey tvalue)))))) tmap)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB dest1)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL tkey)) (TOK_SELEXPR (TOK_TABLE_OR_COL tvalue)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-4 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-4
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ tmap:src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Transform Operator
+ command: /bin/cat
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ Select Operator
+ expressions:
+ expr: tkey
+ type: string
+ expr: tvalue
+ type: string
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: UDFToInteger(_col0)
+ type: int
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: dest1
+
+ Stage: Stage-4
+ Conditional Operator
+ list of dependent Tasks:
+ Move Operator
+ files:
+ hdfs directory: true
+ destination: file:/data/users/njain/hive1/hive1/build/ql/tmp/1595378189/10000
+ Map Reduce
+ Alias -> Map Operator Tree:
+ file:/data/users/njain/hive1/hive1/build/ql/tmp/945395579/10002
+ Reduce Output Operator
+ sort order:
+ Map-reduce partition columns:
+ expr: rand()
+ type: double
+ tag: -1
+ value expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ Reduce Operator Tree:
+ Extract
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: dest1
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: true
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: dest1
+
+
+query: FROM (
+ FROM src
+ SELECT TRANSFORM(src.key, src.value) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+ USING '/bin/cat'
+ AS (tkey, tvalue) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+ RECORDREADER 'org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader'
+) tmap
+INSERT OVERWRITE TABLE dest1 SELECT tkey, tvalue
+Input: default/src
+Output: default/dest1
+query: SELECT dest1.* FROM dest1
+Input: default/dest1
+Output: file:/data/users/njain/hive1/hive1/build/ql/tmp/1097288414/10000
+238 val_238
+86 val_86
+311 val_311
+27 val_27
+165 val_165
+409 val_409
+255 val_255
+278 val_278
+98 val_98
+484 val_484
+265 val_265
+193 val_193
+401 val_401
+150 val_150
+273 val_273
+224 val_224
+369 val_369
+66 val_66
+128 val_128
+213 val_213
+146 val_146
+406 val_406
+429 val_429
+374 val_374
+152 val_152
+469 val_469
+145 val_145
+495 val_495
+37 val_37
+327 val_327
+281 val_281
+277 val_277
+209 val_209
+15 val_15
+82 val_82
+403 val_403
+166 val_166
+417 val_417
+430 val_430
+252 val_252
+292 val_292
+219 val_219
+287 val_287
+153 val_153
+193 val_193
+338 val_338
+446 val_446
+459 val_459
+394 val_394
+237 val_237
+482 val_482
+174 val_174
+413 val_413
+494 val_494
+207 val_207
+199 val_199
+466 val_466
+208 val_208
+174 val_174
+399 val_399
+396 val_396
+247 val_247
+417 val_417
+489 val_489
+162 val_162
+377 val_377
+397 val_397
+309 val_309
+365 val_365
+266 val_266
+439 val_439
+342 val_342
+367 val_367
+325 val_325
+167 val_167
+195 val_195
+475 val_475
+17 val_17
+113 val_113
+155 val_155
+203 val_203
+339 val_339
+0 val_0
+455 val_455
+128 val_128
+311 val_311
+316 val_316
+57 val_57
+302 val_302
+205 val_205
+149 val_149
+438 val_438
+345 val_345
+129 val_129
+170 val_170
+20 val_20
+489 val_489
+157 val_157
+378 val_378
+221 val_221
+92 val_92
+111 val_111
+47 val_47
+72 val_72
+4 val_4
+280 val_280
+35 val_35
+427 val_427
+277 val_277
+208 val_208
+356 val_356
+399 val_399
+169 val_169
+382 val_382
+498 val_498
+125 val_125
+386 val_386
+437 val_437
+469 val_469
+192 val_192
+286 val_286
+187 val_187
+176 val_176
+54 val_54
+459 val_459
+51 val_51
+138 val_138
+103 val_103
+239 val_239
+213 val_213
+216 val_216
+430 val_430
+278 val_278
+176 val_176
+289 val_289
+221 val_221
+65 val_65
+318 val_318
+332 val_332
+311 val_311
+275 val_275
+137 val_137
+241 val_241
+83 val_83
+333 val_333
+180 val_180
+284 val_284
+12 val_12
+230 val_230
+181 val_181
+67 val_67
+260 val_260
+404 val_404
+384 val_384
+489 val_489
+353 val_353
+373 val_373
+272 val_272
+138 val_138
+217 val_217
+84 val_84
+348 val_348
+466 val_466
+58 val_58
+8 val_8
+411 val_411
+230 val_230
+208 val_208
+348 val_348
+24 val_24
+463 val_463
+431 val_431
+179 val_179
+172 val_172
+42 val_42
+129 val_129
+158 val_158
+119 val_119
+496 val_496
+0 val_0
+322 val_322
+197 val_197
+468 val_468
+393 val_393
+454 val_454
+100 val_100
+298 val_298
+199 val_199
+191 val_191
+418 val_418
+96 val_96
+26 val_26
+165 val_165
+327 val_327
+230 val_230
+205 val_205
+120 val_120
+131 val_131
+51 val_51
+404 val_404
+43 val_43
+436 val_436
+156 val_156
+469 val_469
+468 val_468
+308 val_308
+95 val_95
+196 val_196
+288 val_288
+481 val_481
+457 val_457
+98 val_98
+282 val_282
+197 val_197
+187 val_187
+318 val_318
+318 val_318
+409 val_409
+470 val_470
+137 val_137
+369 val_369
+316 val_316
+169 val_169
+413 val_413
+85 val_85
+77 val_77
+0 val_0
+490 val_490
+87 val_87
+364 val_364
+179 val_179
+118 val_118
+134 val_134
+395 val_395
+282 val_282
+138 val_138
+238 val_238
+419 val_419
+15 val_15
+118 val_118
+72 val_72
+90 val_90
+307 val_307
+19 val_19
+435 val_435
+10 val_10
+277 val_277
+273 val_273
+306 val_306
+224 val_224
+309 val_309
+389 val_389
+327 val_327
+242 val_242
+369 val_369
+392 val_392
+272 val_272
+331 val_331
+401 val_401
+242 val_242
+452 val_452
+177 val_177
+226 val_226
+5 val_5
+497 val_497
+402 val_402
+396 val_396
+317 val_317
+395 val_395
+58 val_58
+35 val_35
+336 val_336
+95 val_95
+11 val_11
+168 val_168
+34 val_34
+229 val_229
+233 val_233
+143 val_143
+472 val_472
+322 val_322
+498 val_498
+160 val_160
+195 val_195
+42 val_42
+321 val_321
+430 val_430
+119 val_119
+489 val_489
+458 val_458
+78 val_78
+76 val_76
+41 val_41
+223 val_223
+492 val_492
+149 val_149
+449 val_449
+218 val_218
+228 val_228
+138 val_138
+453 val_453
+30 val_30
+209 val_209
+64 val_64
+468 val_468
+76 val_76
+74 val_74
+342 val_342
+69 val_69
+230 val_230
+33 val_33
+368 val_368
+103 val_103
+296 val_296
+113 val_113
+216 val_216
+367 val_367
+344 val_344
+167 val_167
+274 val_274
+219 val_219
+239 val_239
+485 val_485
+116 val_116
+223 val_223
+256 val_256
+263 val_263
+70 val_70
+487 val_487
+480 val_480
+401 val_401
+288 val_288
+191 val_191
+5 val_5
+244 val_244
+438 val_438
+128 val_128
+467 val_467
+432 val_432
+202 val_202
+316 val_316
+229 val_229
+469 val_469
+463 val_463
+280 val_280
+2 val_2
+35 val_35
+283 val_283
+331 val_331
+235 val_235
+80 val_80
+44 val_44
+193 val_193
+321 val_321
+335 val_335
+104 val_104
+466 val_466
+366 val_366
+175 val_175
+403 val_403
+483 val_483
+53 val_53
+105 val_105
+257 val_257
+406 val_406
+409 val_409
+190 val_190
+406 val_406
+401 val_401
+114 val_114
+258 val_258
+90 val_90
+203 val_203
+262 val_262
+348 val_348
+424 val_424
+12 val_12
+396 val_396
+201 val_201
+217 val_217
+164 val_164
+431 val_431
+454 val_454
+478 val_478
+298 val_298
+125 val_125
+431 val_431
+164 val_164
+424 val_424
+187 val_187
+382 val_382
+5 val_5
+70 val_70
+397 val_397
+480 val_480
+291 val_291
+24 val_24
+351 val_351
+255 val_255
+104 val_104
+70 val_70
+163 val_163
+438 val_438
+119 val_119
+414 val_414
+200 val_200
+491 val_491
+237 val_237
+439 val_439
+360 val_360
+248 val_248
+479 val_479
+305 val_305
+417 val_417
+199 val_199
+444 val_444
+120 val_120
+429 val_429
+169 val_169
+443 val_443
+323 val_323
+325 val_325
+277 val_277
+230 val_230
+478 val_478
+178 val_178
+468 val_468
+310 val_310
+317 val_317
+333 val_333
+493 val_493
+460 val_460
+207 val_207
+249 val_249
+265 val_265
+480 val_480
+83 val_83
+136 val_136
+353 val_353
+172 val_172
+214 val_214
+462 val_462
+233 val_233
+406 val_406
+133 val_133
+175 val_175
+189 val_189
+454 val_454
+375 val_375
+401 val_401
+421 val_421
+407 val_407
+384 val_384
+256 val_256
+26 val_26
+134 val_134
+67 val_67
+384 val_384
+379 val_379
+18 val_18
+462 val_462
+492 val_492
+100 val_100
+298 val_298
+9 val_9
+341 val_341
+498 val_498
+146 val_146
+458 val_458
+362 val_362
+186 val_186
+285 val_285
+348 val_348
+167 val_167
+18 val_18
+273 val_273
+183 val_183
+281 val_281
+344 val_344
+97 val_97
+469 val_469
+315 val_315
+84 val_84
+28 val_28
+37 val_37
+448 val_448
+152 val_152
+348 val_348
+307 val_307
+194 val_194
+414 val_414
+477 val_477
+222 val_222
+126 val_126
+90 val_90
+169 val_169
+403 val_403
+400 val_400
+200 val_200
+97 val_97
+query: drop table dest1
Modified: hadoop/hive/trunk/eclipse-templates/.classpath
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/eclipse-templates/.classpath?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/eclipse-templates/.classpath (original)
+++ hadoop/hive/trunk/eclipse-templates/.classpath Mon Aug 24 18:23:19 2009
@@ -25,6 +25,7 @@
<classpathentry exported="true" kind="lib" path="ql/lib/antlr-runtime-3.0.1.jar"/>
<classpathentry exported="true" kind="lib" path="testlibs/junit-3.8.1.jar"/>
<classpathentry kind="src" path="build/ql/gen-java"/>
+ <classpathentry kind="src" path="build/contrib/test/src"/>
<classpathentry kind="src" path="build/ql/test/src"/>
<classpathentry kind="src" path="cli/src/java"/>
<classpathentry kind="src" path="common/src/java"/>
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+
+public interface RecordReader {
+
+ public void initialize(InputStream in, Configuration conf) throws IOException;
+
+ public Writable createRow() throws IOException;
+
+ public int next(Writable row) throws IOException;
+
+ public void close() throws IOException;
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Aug 24 18:23:19 2009
@@ -41,10 +41,11 @@
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.LineRecordReader.LineReader;
import org.apache.hadoop.util.StringUtils;
@@ -186,7 +187,7 @@
scriptOutputDeserializer = conf.getScriptOutputInfo().getDeserializerClass().newInstance();
scriptOutputDeserializer.initialize(hconf, conf.getScriptOutputInfo().getProperties());
-
+
scriptInputSerializer = (Serializer)conf.getScriptInputInfo().getDeserializerClass().newInstance();
scriptInputSerializer.initialize(hconf, conf.getScriptInputInfo().getProperties());
@@ -222,9 +223,17 @@
scriptOut = new DataOutputStream(new BufferedOutputStream(scriptPid.getOutputStream()));
scriptIn = new DataInputStream(new BufferedInputStream(scriptPid.getInputStream()));
scriptErr = new DataInputStream(new BufferedInputStream(scriptPid.getErrorStream()));
- outThread = new StreamThread(scriptIn, new OutputStreamProcessor(
+
+ RecordReader scriptOutputReader = conf.getOutRecordReaderClass().newInstance();
+ scriptOutputReader.initialize(scriptIn, hconf);
+
+ outThread = new StreamThread(scriptOutputReader, new OutputStreamProcessor(
scriptOutputDeserializer.getObjectInspector()), "OutputProcessor");
- errThread = new StreamThread(scriptErr,
+
+ RecordReader scriptErrReader = conf.getOutRecordReaderClass().newInstance();
+ scriptErrReader.initialize(scriptErr, hconf);
+
+ errThread = new StreamThread(scriptErrReader,
new ErrorStreamProcessor
(HiveConf.getIntVar(hconf, HiveConf.ConfVars.SCRIPTERRORLIMIT)),
"ErrorProcessor");
@@ -318,7 +327,7 @@
interface StreamProcessor {
- public void processLine(Text line) throws HiveException;
+ public void processLine(Writable line) throws HiveException;
public void close() throws HiveException;
}
@@ -329,7 +338,7 @@
public OutputStreamProcessor(ObjectInspector rowInspector) {
this.rowInspector = rowInspector;
}
- public void processLine(Text line) throws HiveException {
+ public void processLine(Writable line) throws HiveException {
try {
row = scriptOutputDeserializer.deserialize(line);
} catch (SerDeException e) {
@@ -360,10 +369,16 @@
lastReportTime = 0;
}
- public void processLine(Text line) throws HiveException {
+ public void processLine(Writable line) throws HiveException {
String stringLine = line.toString();
+ int len = 0;
+ if (line instanceof Text)
+ len = ((Text)line).getLength();
+ else if (line instanceof BytesWritable)
+ len = ((BytesWritable)line).getSize();
+
// Report progress for each stderr line, but no more frequently than once per minute.
long now = System.currentTimeMillis();
// reporter is a member variable of the Operator class.
@@ -375,11 +390,11 @@
if((maxBytes < 0) || (bytesCopied < maxBytes)) {
System.err.println(stringLine);
}
- if (bytesCopied < maxBytes && bytesCopied + line.getLength() >= maxBytes) {
+ if (bytesCopied < maxBytes && bytesCopied + len >= maxBytes) {
System.err.println("Operator " + id + " " + getName()
+ ": exceeding stderr limit of " + maxBytes + " bytes, will truncate stderr messages.");
}
- bytesCopied += line.getLength();
+ bytesCopied += len;
}
public void close() {
}
@@ -389,11 +404,11 @@
class StreamThread extends Thread {
- InputStream in;
+ RecordReader in;
StreamProcessor proc;
String name;
- StreamThread(InputStream in, StreamProcessor proc, String name) {
+ StreamThread(RecordReader in, StreamProcessor proc, String name) {
this.in = in;
this.proc = proc;
this.name = name;
@@ -401,14 +416,11 @@
}
public void run() {
- LineReader lineReader = null;
try {
- Text row = new Text();
- lineReader = new LineReader((InputStream)in, hconf);
+ Writable row = in.createRow();
while(true) {
- row.clear();
- long bytes = lineReader.readLine(row);
+ long bytes = in.next(row);
if(bytes <= 0) {
break;
}
@@ -421,10 +433,9 @@
LOG.warn(StringUtils.stringifyException(th));
} finally {
try {
- if(lineReader != null) {
- lineReader.close();
+ if (in != null) {
+ in.close();
}
- in.close();
proc.close();
} catch (Exception e) {
LOG.warn(name + ": error in closing ..");
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+
+public class TextRecordReader implements RecordReader {
+
+ private LineReader lineReader;
+ private InputStream in;
+ private Text row;
+
+ public void initialize(InputStream in, Configuration conf) throws IOException {
+ lineReader = new LineReader(in, conf);
+ this.in = in;
+ }
+
+ public Writable createRow() throws IOException {
+ row = new Text();
+ return row;
+ }
+
+ public int next(Writable row) throws IOException {
+ if (lineReader == null)
+ return -1;
+
+ return lineReader.readLine((Text)row);
+ }
+
+ public void close() throws IOException {
+ if (in != null)
+ in.close();
+ }
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TypedBytesRecordReader.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.util.typedbytes.Type;
+import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableInput;
+import org.apache.hadoop.hive.ql.util.typedbytes.TypedBytesWritableOutput;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer;
+
+public class TypedBytesRecordReader implements RecordReader {
+
+ private DataInputStream din;
+ private TypedBytesWritableInput tbIn;
+
+ NonSyncDataOutputBuffer barrStr = new NonSyncDataOutputBuffer();
+ TypedBytesWritableOutput tbOut = new TypedBytesWritableOutput(barrStr);
+
+ ArrayList<Writable> row = new ArrayList<Writable>(0);
+
+ public void initialize(InputStream in, Configuration conf) throws IOException {
+ din = new DataInputStream(in);
+ tbIn = new TypedBytesWritableInput(din);
+ }
+
+ public Writable createRow() throws IOException {
+ BytesWritable retWrit = new BytesWritable();
+ return retWrit;
+ }
+
+ private Writable allocateWritable(Type type) {
+ switch (type) {
+ case BYTE:
+ return new ByteWritable();
+ case BOOL:
+ return new BooleanWritable();
+ case INT:
+ return new IntWritable();
+ case SHORT:
+ return new ShortWritable();
+ case LONG:
+ return new LongWritable();
+ case FLOAT:
+ return new FloatWritable();
+ case DOUBLE:
+ return new DoubleWritable();
+ case STRING:
+ return new Text();
+ default:
+ assert false; // not supported
+ }
+ return null;
+ }
+
+ public int next(Writable data) throws IOException {
+ int pos = 0;
+ barrStr.reset();
+
+ while (true) {
+ Type type = tbIn.readTypeCode();
+
+ // it was a empty stream
+ if (type == null)
+ return -1;
+
+ if (type == Type.ENDOFRECORD) {
+ tbOut.writeEndOfRecord();
+ if (barrStr.getLength() > 0)
+ ((BytesWritable)data).set(barrStr.getData(), 0, barrStr.getLength());
+ return barrStr.getLength();
+ }
+
+ if (pos >= row.size()) {
+ Writable wrt = allocateWritable(type);
+ assert pos == row.size();
+ row.add(wrt);
+ }
+
+ switch (type) {
+ case BYTE: {
+ ByteWritable bw = (ByteWritable)row.get(pos);
+ tbIn.readByte(bw);
+ tbOut.writeByte(bw);
+ break;
+ }
+ case BOOL: {
+ BooleanWritable bw = (BooleanWritable)row.get(pos);
+ tbIn.readBoolean(bw);
+ tbOut.writeBoolean(bw);
+ break;
+ }
+ case INT: {
+ IntWritable iw = (IntWritable)row.get(pos);
+ tbIn.readInt(iw);
+ tbOut.writeInt(iw);
+ break;
+ }
+ case SHORT: {
+ ShortWritable sw = (ShortWritable)row.get(pos);
+ tbIn.readShort(sw);
+ tbOut.writeShort(sw);
+ break;
+ }
+ case LONG: {
+ LongWritable lw = (LongWritable)row.get(pos);
+ tbIn.readLong(lw);
+ tbOut.writeLong(lw);
+ break;
+ }
+ case FLOAT: {
+ FloatWritable fw = (FloatWritable)row.get(pos);
+ tbIn.readFloat(fw);
+ tbOut.writeFloat(fw);
+ break;
+ }
+ case DOUBLE: {
+ DoubleWritable dw = (DoubleWritable)row.get(pos);
+ tbIn.readDouble(dw);
+ tbOut.writeDouble(dw);
+ break;
+ }
+ case STRING: {
+ Text txt = (Text)row.get(pos);
+ tbIn.readText(txt);
+ tbOut.writeText(txt);
+ break;
+ }
+ default:
+ assert false; // should never come here
+ }
+
+ pos++;
+ }
+ }
+
+ public void close() throws IOException {
+ if (din != null)
+ din.close();
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Mon Aug 24 18:23:19 2009
@@ -137,6 +137,7 @@
TOK_HINTARGLIST;
TOK_USERSCRIPTCOLNAMES;
TOK_USERSCRIPTCOLSCHEMA;
+TOK_RECORDREADER;
}
@@ -371,6 +372,14 @@
@after { msgs.pop(); }
: serdeFormat -> ^(TOK_SERDE serdeFormat)
| serdePropertiesFormat -> ^(TOK_SERDE serdePropertiesFormat)
+ | -> ^(TOK_SERDE)
+ ;
+
+recordReader
+@init { msgs.push("record reader specification"); }
+@after { msgs.pop(); }
+ : KW_RECORDREADER StringLiteral -> ^(TOK_RECORDREADER StringLiteral)
+ | -> ^(TOK_RECORDREADER)
;
serdeFormat
@@ -730,9 +739,11 @@
( KW_SELECT KW_TRANSFORM LPAREN selectExpressionList RPAREN
| KW_MAP selectExpressionList
| KW_REDUCE selectExpressionList )
- inSerde=serde? KW_USING StringLiteral
- ( KW_AS ((LPAREN (aliasList | columnNameTypeList) RPAREN) | (aliasList | columnNameTypeList)) outSerde=serde?)?
- -> ^(TOK_TRANSFORM selectExpressionList $inSerde? StringLiteral aliasList? columnNameTypeList? $outSerde?)
+ inSerde=serde
+ KW_USING StringLiteral
+ ( KW_AS ((LPAREN (aliasList | columnNameTypeList) RPAREN) | (aliasList | columnNameTypeList)))?
+ outSerde=serde outRec=recordReader
+ -> ^(TOK_TRANSFORM selectExpressionList $inSerde StringLiteral $outSerde $outRec aliasList? columnNameTypeList?)
;
selectExpression
@@ -1307,6 +1318,7 @@
KW_CONTINUE: 'CONTINUE';
KW_CURSOR: 'CURSOR';
KW_TRIGGER: 'TRIGGER';
+KW_RECORDREADER: 'RECORDREADER';
// Operators
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Aug 24 18:23:19 2009
@@ -51,6 +51,7 @@
import org.apache.hadoop.hive.ql.exec.MapRedTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RecordReader;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -132,6 +133,9 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.ql.exec.TextRecordReader;
+import org.apache.hadoop.hive.ql.exec.TypedBytesRecordReader;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -1114,18 +1118,18 @@
return cmd;
}
- private tableDesc getTableDescFromSerDe(ASTNode child, String cols, boolean defaultCols) throws SemanticException {
+ private tableDesc getTableDescFromSerDe(ASTNode child, String cols, String colTypes, boolean defaultCols) throws SemanticException {
if (child.getType() == HiveParser.TOK_SERDENAME) {
String serdeName = unescapeSQLString(child.getChild(0).getText());
Class<? extends Deserializer> serdeClass = null;
try {
- serdeClass = (Class<? extends Deserializer>)Class.forName(serdeName);
+ serdeClass = (Class<? extends Deserializer>)Class.forName(serdeName, true, JavaUtils.getClassLoader());
} catch (ClassNotFoundException e) {
throw new SemanticException(e);
}
- tableDesc tblDesc = PlanUtils.getTableDesc(serdeClass, Integer.toString(Utilities.tabCode), cols, defaultCols);
+ tableDesc tblDesc = PlanUtils.getTableDesc(serdeClass, Integer.toString(Utilities.tabCode), cols, colTypes, defaultCols, true);
// copy all the properties
if (child.getChildCount() == 2) {
ASTNode prop = (ASTNode)((ASTNode)child.getChild(1)).getChild(0);
@@ -1138,7 +1142,7 @@
return tblDesc;
}
else if (child.getType() == HiveParser.TOK_SERDEPROPS) {
- tableDesc tblDesc = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols, defaultCols);
+ tableDesc tblDesc = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols, colTypes, defaultCols);
int numChildRowFormat = child.getChildCount();
for (int numC = 0; numC < numChildRowFormat; numC++)
{
@@ -1173,112 +1177,136 @@
// should never come here
return null;
}
-
+
@SuppressWarnings("nls")
private Operator genScriptPlan(ASTNode trfm, QB qb,
Operator input) throws SemanticException {
// If there is no "AS" clause, the output schema will be "key,value"
ArrayList<ColumnInfo> outputCols = new ArrayList<ColumnInfo>();
- boolean defaultOutputColList = true;
- int inputSerDeChildNum = -1, outputSerDeChildNum = -1;
- int outputColumnNamesPos = -1, outputColumnSchemaPos = -1;
- int execPos = 1;
-
+ int inputSerDeNum = 1;
+ int outputSerDeNum = 3, outputRecordReaderNum = 4;
+ int outputColsNum = 5;
+ boolean outputColNames = false, outputColSchemas = false;
+ int execPos = 2;
+ boolean defaultOutputCols = false;
+
// Go over all the children
- for (int pos = 0; pos < trfm.getChildCount(); pos++) {
- ASTNode child = (ASTNode)trfm.getChild(pos);
- if (child.getType() == HiveParser.TOK_ALIASLIST) {
- defaultOutputColList = false;
- outputColumnNamesPos = pos;
- break;
- }
- else if (child.getType() == HiveParser.TOK_TABCOLLIST) {
- defaultOutputColList = false;
- outputColumnSchemaPos = pos;
- break;
- }
+ if (trfm.getChildCount() > outputColsNum) {
+ ASTNode outCols = (ASTNode)trfm.getChild(outputColsNum);
+ if (outCols.getType() == HiveParser.TOK_ALIASLIST)
+ outputColNames = true;
+ else if (outCols.getType() == HiveParser.TOK_TABCOLLIST)
+ outputColSchemas = true;
}
- // input serde specified
- if ((trfm.getChildCount() > 1) &&
- (trfm.getChild(1).getType() == HiveParser.TOK_SERDE)) {
- inputSerDeChildNum = 1;
- execPos++;
- }
-
- // output serde specified
- int checkChildNum = -1;
- if (outputColumnNamesPos >= 0)
- checkChildNum = outputColumnNamesPos + 1;
- else if (outputColumnSchemaPos >= 0)
- checkChildNum = outputColumnSchemaPos + 1;
-
- if (checkChildNum >= 0) {
- if ((trfm.getChildCount() > (checkChildNum))
- && (trfm.getChild(checkChildNum).getType() == HiveParser.TOK_SERDE))
- outputSerDeChildNum = checkChildNum;
- }
-
// If column type is not specified, use a string
- if (defaultOutputColList) {
+ if (!outputColNames && !outputColSchemas) {
outputCols.add(new ColumnInfo("key", TypeInfoFactory.stringTypeInfo, null, false));
outputCols.add(new ColumnInfo("value", TypeInfoFactory.stringTypeInfo, null, false));
+ defaultOutputCols = true;
}
- else if (outputColumnNamesPos >= 0) {
- ASTNode collist = (ASTNode) trfm.getChild(outputColumnNamesPos);
- int ccount = collist.getChildCount();
- for (int i=0; i < ccount; ++i) {
- outputCols.add(new ColumnInfo(unescapeIdentifier(((ASTNode)collist.getChild(i)).getText()), TypeInfoFactory.stringTypeInfo, null, false));
- }
- }
else {
- assert outputColumnSchemaPos >= 0;
- ASTNode collist = (ASTNode) trfm.getChild(outputColumnSchemaPos);
+ ASTNode collist = (ASTNode) trfm.getChild(outputColsNum);
int ccount = collist.getChildCount();
- for (int i=0; i < ccount; ++i) {
- ASTNode child = (ASTNode) collist.getChild(i);
- assert child.getType() == HiveParser.TOK_TABCOL;
- outputCols.add(new ColumnInfo(unescapeIdentifier(((ASTNode)child.getChild(0)).getText()),
- TypeInfoUtils.getTypeInfoFromTypeString(DDLSemanticAnalyzer.getTypeName(((ASTNode)child.getChild(1)).getType())), null, false));
+
+ if (outputColNames) {
+ for (int i=0; i < ccount; ++i) {
+ outputCols.add(new ColumnInfo(unescapeIdentifier(((ASTNode)collist.getChild(i)).getText()), TypeInfoFactory.stringTypeInfo, null, false));
+ }
+ }
+ else {
+ for (int i=0; i < ccount; ++i) {
+ ASTNode child = (ASTNode) collist.getChild(i);
+ assert child.getType() == HiveParser.TOK_TABCOL;
+ outputCols.add(new ColumnInfo(unescapeIdentifier(((ASTNode)child.getChild(0)).getText()),
+ TypeInfoUtils.getTypeInfoFromTypeString(DDLSemanticAnalyzer.getTypeName(((ASTNode)child.getChild(1)).getType())), null, false));
+ }
}
}
-
+
RowResolver out_rwsch = new RowResolver();
StringBuilder columns = new StringBuilder();
+ StringBuilder columnTypes = new StringBuilder();
+
for (int i = 0; i < outputCols.size(); ++i) {
if (i != 0) {
columns.append(",");
+ columnTypes.append(",");
}
+
columns.append(outputCols.get(i).getInternalName());
+ columnTypes.append(outputCols.get(i).getType().getTypeName());
+
out_rwsch.put(
qb.getParseInfo().getAlias(),
outputCols.get(i).getInternalName(),
outputCols.get(i));
}
+ StringBuilder inpColumns = new StringBuilder();
+ StringBuilder inpColumnTypes = new StringBuilder();
+ Vector<ColumnInfo> inputSchema = opParseCtx.get(input).getRR().getColumnInfos();
+ for (int i = 0; i < inputSchema.size(); ++i) {
+ if (i != 0) {
+ inpColumns.append(",");
+ inpColumnTypes.append(",");
+ }
+
+ inpColumns.append(inputSchema.get(i).getInternalName());
+ inpColumnTypes.append(inputSchema.get(i).getType().getTypeName());
+ }
+
tableDesc outInfo;
tableDesc inInfo;
+ String defaultSerdeName = conf.getVar(HiveConf.ConfVars.HIVESCRIPTSERDE);
+ Class<? extends Deserializer> serde;
- if (inputSerDeChildNum < 0)
- inInfo = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), "");
- else
- inInfo = getTableDescFromSerDe((ASTNode)(((ASTNode)trfm.getChild(inputSerDeChildNum))).getChild(0), "", false);
+ try {
+ serde = (Class<? extends Deserializer>)Class.forName(defaultSerdeName, true, JavaUtils.getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new SemanticException(e);
+ }
- if (outputSerDeChildNum < 0)
- outInfo = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), columns.toString(), defaultOutputColList);
+ // Input and Output Serdes
+ if (trfm.getChild(inputSerDeNum).getChildCount() > 0)
+ inInfo = getTableDescFromSerDe((ASTNode)(((ASTNode)trfm.getChild(inputSerDeNum))).getChild(0), inpColumns.toString(), inpColumnTypes.toString(), false);
else
- outInfo = getTableDescFromSerDe((ASTNode)(((ASTNode)trfm.getChild(outputSerDeChildNum))).getChild(0), columns.toString(), defaultOutputColList);
+ inInfo = PlanUtils.getTableDesc(serde, Integer.toString(Utilities.tabCode), inpColumns.toString(), inpColumnTypes.toString(), false, true);
+ if (trfm.getChild(inputSerDeNum).getChildCount() > 0)
+ outInfo = getTableDescFromSerDe((ASTNode)(((ASTNode)trfm.getChild(outputSerDeNum))).getChild(0), columns.toString(), columnTypes.toString(), false);
+ // This is for backward compatibility. If the user did not specify the output column list, we assume that there are 2 columns: key and value.
+ // However, if the script outputs: col1, col2, col3 seperated by TAB, the requirement is: key is col and value is (col2 TAB col3)
+ else
+ outInfo = PlanUtils.getTableDesc(serde, Integer.toString(Utilities.tabCode), columns.toString(), columnTypes.toString(), defaultOutputCols);
+
+ // Output record readers
+ Class <? extends RecordReader> outRecordReader = getRecordReader((ASTNode)trfm.getChild(outputRecordReaderNum));
+
Operator output = putOpInsertMap(OperatorFactory
.getAndMakeChild(
- new scriptDesc(
- getFixedCmd(stripQuotes(trfm.getChild(execPos).getText())),
- outInfo, inInfo),
+ new scriptDesc(getFixedCmd(stripQuotes(trfm.getChild(execPos).getText())),
+ inInfo, outInfo, outRecordReader),
new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch);
return output;
}
+ private Class<? extends RecordReader> getRecordReader(ASTNode node) throws SemanticException {
+ String name;
+
+ if (node.getChildCount() == 0)
+ name = conf.getVar(HiveConf.ConfVars.HIVESCRIPTRECORDREADER);
+ else
+ name = unescapeSQLString(node.getChild(0).getText());
+
+ try {
+ return (Class<? extends RecordReader>)Class.forName(name, true, JavaUtils.getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new SemanticException(e);
+ }
+ }
+
/**
* This function is a wrapper of parseInfo.getGroupByForClause which automatically
* translates SELECT DISTINCT a,b,c to SELECT a,b,c GROUP BY a,b,c.
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Mon Aug 24 18:23:19 2009
@@ -106,6 +106,13 @@
public static tableDesc getTableDesc(Class<? extends Deserializer> serdeClass,
String separatorCode, String columns, String columnTypes,
boolean lastColumnTakesRestOfTheLine) {
+ return getTableDesc(serdeClass, separatorCode, columns, columnTypes, lastColumnTakesRestOfTheLine, false);
+ }
+
+ public static tableDesc getTableDesc(Class<? extends Deserializer> serdeClass,
+ String separatorCode, String columns, String columnTypes,
+ boolean lastColumnTakesRestOfTheLine, boolean useJSONForLazy) {
+
Properties properties = Utilities.makeProperties(
Constants.SERIALIZATION_FORMAT, separatorCode,
Constants.LIST_COLUMNS, columns);
@@ -117,6 +124,16 @@
Constants.SERIALIZATION_LAST_COLUMN_TAKES_REST,
"true");
}
+
+ // It is not a very clean way, and should be modified later - due to compatiblity reasons,
+ // user sees the results as json for custom scripts and has no way for specifying that.
+ // Right now, it is hard-coded in the code
+ if (useJSONForLazy)
+ properties.setProperty(
+ Constants.SERIALIZATION_USE_JSON_OBJECTS,
+ "true");
+
+
return new tableDesc(
serdeClass,
TextInputFormat.class,
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java?rev=807330&r1=807329&r2=807330&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/scriptDesc.java Mon Aug 24 18:23:19 2009
@@ -20,6 +20,8 @@
import java.io.Serializable;
+import org.apache.hadoop.hive.ql.exec.RecordReader;
+
@explain(displayName="Transform Operator")
public class scriptDesc implements Serializable {
private static final long serialVersionUID = 1L;
@@ -28,16 +30,19 @@
private tableDesc scriptOutputInfo;
// Describe how to serialize data out to user script
private tableDesc scriptInputInfo;
+ private Class<? extends RecordReader> outRecordReaderClass;
public scriptDesc() { }
public scriptDesc(
final String scriptCmd,
+ final tableDesc scriptInputInfo,
final tableDesc scriptOutputInfo,
- final tableDesc scriptInputInfo) {
-
+ final Class<? extends RecordReader> outRecordReaderClass) {
+
this.scriptCmd = scriptCmd;
- this.scriptOutputInfo = scriptOutputInfo;
this.scriptInputInfo = scriptInputInfo;
+ this.scriptOutputInfo = scriptOutputInfo;
+ this.outRecordReaderClass = outRecordReaderClass;
}
@explain(displayName="command")
@@ -61,4 +66,17 @@
public void setScriptInputInfo(tableDesc scriptInputInfo) {
this.scriptInputInfo = scriptInputInfo;
}
+ /**
+ * @return the outRecordReaderClass
+ */
+ public Class<? extends RecordReader> getOutRecordReaderClass() {
+ return outRecordReaderClass;
+ }
+ /**
+ * @param outRecordReaderClass the outRecordReaderClass to set
+ */
+ public void setOutRecordReaderClass(
+ Class<? extends RecordReader> outRecordReaderClass) {
+ this.outRecordReaderClass = outRecordReaderClass;
+ }
}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/Type.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util.typedbytes;
+
+/**
+ * The possible type codes.
+ */
+public enum Type {
+
+ // codes for supported types (< 50):
+ BYTES(0),
+ BYTE(1),
+ BOOL(2),
+ INT(3),
+ LONG(4),
+ FLOAT(5),
+ DOUBLE(6),
+ STRING(7),
+ VECTOR(8),
+ LIST(9),
+ MAP(10),
+ SHORT(11),
+
+ // application-specific codes (50-200):
+ WRITABLE(50),
+
+ ENDOFRECORD(100),
+
+ // low-level codes (> 200):
+ MARKER(255);
+
+ final int code;
+
+ Type(int code) {
+ this.code = code;
+ }
+}
\ No newline at end of file
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java?rev=807330&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/typedbytes/TypedBytesInput.java Mon Aug 24 18:23:19 2009
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util.typedbytes;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.record.Buffer;
+
+/**
+ * Provides functionality for reading typed bytes.
+ */
+public class TypedBytesInput {
+
+ private DataInput in;
+
+ private TypedBytesInput() {}
+
+ private void setDataInput(DataInput in) {
+ this.in = in;
+ }
+
+ private static ThreadLocal tbIn = new ThreadLocal() {
+ protected synchronized Object initialValue() {
+ return new TypedBytesInput();
+ }
+ };
+
+ /**
+ * Get a thread-local typed bytes input for the supplied {@link DataInput}.
+ * @param in data input object
+ * @return typed bytes input corresponding to the supplied {@link DataInput}.
+ */
+ public static TypedBytesInput get(DataInput in) {
+ TypedBytesInput bin = (TypedBytesInput) tbIn.get();
+ bin.setDataInput(in);
+ return bin;
+ }
+
+ /** Creates a new instance of TypedBytesInput. */
+ public TypedBytesInput(DataInput in) {
+ this.in = in;
+ }
+
+ /**
+ * Reads a typed bytes sequence and converts it to a Java object. The first
+ * byte is interpreted as a type code, and then the right number of
+ * subsequent bytes are read depending on the obtained type.
+ * @return the obtained object or null when the end of the file is reached
+ * @throws IOException
+ */
+ public Object read() throws IOException {
+ int code = 1;
+ try {
+ code = in.readUnsignedByte();
+ } catch (EOFException eof) {
+ return null;
+ }
+ if (code == Type.BYTES.code) {
+ return new Buffer(readBytes());
+ } else if (code == Type.BYTE.code) {
+ return readByte();
+ } else if (code == Type.BOOL.code) {
+ return readBool();
+ } else if (code == Type.INT.code) {
+ return readInt();
+ } else if (code == Type.SHORT.code) {
+ return readShort();
+ } else if (code == Type.LONG.code) {
+ return readLong();
+ } else if (code == Type.FLOAT.code) {
+ return readFloat();
+ } else if (code == Type.DOUBLE.code) {
+ return readDouble();
+ } else if (code == Type.STRING.code) {
+ return readString();
+ } else if (code == Type.VECTOR.code) {
+ return readVector();
+ } else if (code == Type.LIST.code) {
+ return readList();
+ } else if (code == Type.MAP.code) {
+ return readMap();
+ } else if (code == Type.MARKER.code) {
+ return null;
+ } else if (50 <= code && code <= 200) { // application-specific typecodes
+ return new Buffer(readBytes());
+ } else {
+ throw new RuntimeException("unknown type");
+ }
+ }
+
+ /**
+ * Reads a typed bytes sequence. The first byte is interpreted as a type code,
+ * and then the right number of subsequent bytes are read depending on the
+ * obtained type.
+ *
+ * @return the obtained typed bytes sequence or null when the end of the file
+ * is reached
+ * @throws IOException
+ */
+ public byte[] readRaw() throws IOException {
+ int code = -1;
+ try {
+ code = in.readUnsignedByte();
+ } catch (EOFException eof) {
+ return null;
+ }
+ if (code == Type.BYTES.code) {
+ return readRawBytes();
+ } else if (code == Type.BYTE.code) {
+ return readRawByte();
+ } else if (code == Type.BOOL.code) {
+ return readRawBool();
+ } else if (code == Type.INT.code) {
+ return readRawInt();
+ } else if (code == Type.LONG.code) {
+ return readRawLong();
+ } else if (code == Type.FLOAT.code) {
+ return readRawFloat();
+ } else if (code == Type.DOUBLE.code) {
+ return readRawDouble();
+ } else if (code == Type.STRING.code) {
+ return readRawString();
+ } else if (code == Type.VECTOR.code) {
+ return readRawVector();
+ } else if (code == Type.LIST.code) {
+ return readRawList();
+ } else if (code == Type.MAP.code) {
+ return readRawMap();
+ } else if (code == Type.MARKER.code) {
+ return null;
+ } else if (50 <= code && code <= 200) { // application-specific typecodes
+ return readRawBytes();
+ } else {
+ throw new RuntimeException("unknown type");
+ }
+ }
+
+ /**
+ * Reads a type byte and returns the corresponding {@link Type}.
+ * @return the obtained Type or null when the end of the file is reached
+ * @throws IOException
+ */
+ public Type readType() throws IOException {
+ int code = -1;
+ try {
+ code = in.readUnsignedByte();
+ } catch (EOFException eof) {
+ return null;
+ }
+ for (Type type : Type.values()) {
+ if (type.code == code) {
+ return type;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Skips a type byte.
+ * @return true iff the end of the file was not reached
+ * @throws IOException
+ */
+ public boolean skipType() throws IOException {
+ try {
+ in.readByte();
+ return true;
+ } catch (EOFException eof) {
+ return false;
+ }
+ }
+
+ /**
+ * Reads the bytes following a <code>Type.BYTES</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readBytes() throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ return bytes;
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.BYTES</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawBytes() throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[5 + length];
+ bytes[0] = (byte) Type.BYTES.code;
+ bytes[1] = (byte) (0xff & (length >> 24));
+ bytes[2] = (byte) (0xff & (length >> 16));
+ bytes[3] = (byte) (0xff & (length >> 8));
+ bytes[4] = (byte) (0xff & length);
+ in.readFully(bytes, 5, length);
+ return bytes;
+ }
+
+ /**
+ * Reads the byte following a <code>Type.BYTE</code> code.
+ * @return the obtained byte
+ * @throws IOException
+ */
+ public byte readByte() throws IOException {
+ return in.readByte();
+ }
+
+ /**
+ * Reads the raw byte following a <code>Type.BYTE</code> code.
+ * @return the obtained byte
+ * @throws IOException
+ */
+ public byte[] readRawByte() throws IOException {
+ byte[] bytes = new byte[2];
+ bytes[0] = (byte) Type.BYTE.code;
+ in.readFully(bytes, 1, 1);
+ return bytes;
+ }
+
+ /**
+ * Reads the boolean following a <code>Type.BOOL</code> code.
+ * @return the obtained boolean
+ * @throws IOException
+ */
+ public boolean readBool() throws IOException {
+ return in.readBoolean();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.BOOL</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawBool() throws IOException {
+ byte[] bytes = new byte[2];
+ bytes[0] = (byte) Type.BOOL.code;
+ in.readFully(bytes, 1, 1);
+ return bytes;
+ }
+
+ /**
+ * Reads the integer following a <code>Type.INT</code> code.
+ * @return the obtained integer
+ * @throws IOException
+ */
+ public int readInt() throws IOException {
+ return in.readInt();
+ }
+
+ /**
+ * Reads the short following a <code>Type.SHORT</code> code.
+ * @return the obtained short
+ * @throws IOException
+ */
+ public short readShort() throws IOException {
+ return in.readShort();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.INT</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawInt() throws IOException {
+ byte[] bytes = new byte[5];
+ bytes[0] = (byte) Type.INT.code;
+ in.readFully(bytes, 1, 4);
+ return bytes;
+ }
+
+ /**
+ * Reads the long following a <code>Type.LONG</code> code.
+ * @return the obtained long
+ * @throws IOException
+ */
+ public long readLong() throws IOException {
+ return in.readLong();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.LONG</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawLong() throws IOException {
+ byte[] bytes = new byte[9];
+ bytes[0] = (byte) Type.LONG.code;
+ in.readFully(bytes, 1, 8);
+ return bytes;
+ }
+
+ /**
+ * Reads the float following a <code>Type.FLOAT</code> code.
+ * @return the obtained float
+ * @throws IOException
+ */
+ public float readFloat() throws IOException {
+ return in.readFloat();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.FLOAT</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawFloat() throws IOException {
+ byte[] bytes = new byte[5];
+ bytes[0] = (byte) Type.FLOAT.code;
+ in.readFully(bytes, 1, 4);
+ return bytes;
+ }
+
+ /**
+ * Reads the double following a <code>Type.DOUBLE</code> code.
+ * @return the obtained double
+ * @throws IOException
+ */
+ public double readDouble() throws IOException {
+ return in.readDouble();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.DOUBLE</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawDouble() throws IOException {
+ byte[] bytes = new byte[9];
+ bytes[0] = (byte) Type.DOUBLE.code;
+ in.readFully(bytes, 1, 8);
+ return bytes;
+ }
+
+ /**
+ * Reads the string following a <code>Type.STRING</code> code.
+ * @return the obtained string
+ * @throws IOException
+ */
+ public String readString() throws IOException {
+ return WritableUtils.readString(in);
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.STRING</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawString() throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[5 + length];
+ bytes[0] = (byte) Type.STRING.code;
+ bytes[1] = (byte) (0xff & (length >> 24));
+ bytes[2] = (byte) (0xff & (length >> 16));
+ bytes[3] = (byte) (0xff & (length >> 8));
+ bytes[4] = (byte) (0xff & length);
+ in.readFully(bytes, 5, length);
+ return bytes;
+ }
+
+ /**
+ * Reads the vector following a <code>Type.VECTOR</code> code.
+ * @return the obtained vector
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public ArrayList readVector() throws IOException {
+ int length = readVectorHeader();
+ ArrayList result = new ArrayList(length);
+ for (int i = 0; i < length; i++) {
+ result.add(read());
+ }
+ return result;
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.VECTOR</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawVector() throws IOException {
+ Buffer buffer = new Buffer();
+ int length = readVectorHeader();
+ buffer.append(new byte[] {
+ (byte) Type.VECTOR.code,
+ (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)),
+ (byte) (0xff & (length >> 8)), (byte) (0xff & length)
+ });
+ for (int i = 0; i < length; i++) {
+ buffer.append(readRaw());
+ }
+ return buffer.get();
+ }
+
+ /**
+ * Reads the header following a <code>Type.VECTOR</code> code.
+ * @return the number of elements in the vector
+ * @throws IOException
+ */
+ public int readVectorHeader() throws IOException {
+ return in.readInt();
+ }
+
+ /**
+ * Reads the list following a <code>Type.LIST</code> code.
+ * @return the obtained list
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public List readList() throws IOException {
+ List list = new ArrayList();
+ Object obj = read();
+ while (obj != null) {
+ list.add(obj);
+ obj = read();
+ }
+ return list;
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.LIST</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawList() throws IOException {
+ Buffer buffer = new Buffer(new byte[] { (byte) Type.LIST.code });
+ byte[] bytes = readRaw();
+ while (bytes != null) {
+ buffer.append(bytes);
+ bytes = readRaw();
+ }
+ buffer.append(new byte[] { (byte) Type.MARKER.code });
+ return buffer.get();
+ }
+
+ /**
+ * Reads the map following a <code>Type.MAP</code> code.
+ * @return the obtained map
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public TreeMap readMap() throws IOException {
+ int length = readMapHeader();
+ TreeMap result = new TreeMap();
+ for (int i = 0; i < length; i++) {
+ Object key = read();
+ Object value = read();
+ result.put(key, value);
+ }
+ return result;
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.MAP</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawMap() throws IOException {
+ Buffer buffer = new Buffer();
+ int length = readMapHeader();
+ buffer.append(new byte[] {
+ (byte) Type.MAP.code,
+ (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)),
+ (byte) (0xff & (length >> 8)), (byte) (0xff & length)
+ });
+ for (int i = 0; i < length; i++) {
+ buffer.append(readRaw());
+ buffer.append(readRaw());
+ }
+ return buffer.get();
+ }
+
+ /**
+ * Reads the header following a <code>Type.MAP</code> code.
+ * @return the number of key-value pairs in the map
+ * @throws IOException
+ */
+ public int readMapHeader() throws IOException {
+ return in.readInt();
+ }
+
+}